2024-02-28
Kotlin & 客户端
00

目录

1 使用协程并发运行
1.1 从顺序执行开始
1.2 创建一个协程
1.3 启动一个任务
1.4 与挂起点交错调用
2 协程上下文和线程
2.1 显式设置上下文
2.2 在自定义池中运行
2.3 在挂起点后切换线程
2.4 修改CouroutineContext
3 调试线程
3.1 async和await
3.2 看一眼延续
4 创建无限序列
4.1 使用无限序列
4.2 使用iterator函数
5 非阻塞异步编程
5.1 按顺序开始
5.2 进行异步
5.3 异常处理
5.3.1 启动与异常
5.3.2 异步与异常
5.4 取消和超时
5.4.1 取消协程
5.4.2 请勿打扰
5.4.3 监督作业
5.4.4 超时

协程,又称轻量级线程,它的实现是在编译器上的,而不是操作系统的。它有多个入口点,可以在指定位置挂起和恢复执行,由用户自由地进行调控。引入它是为了“用同步的语义来解决异步的问题”,即业务逻辑看起来是同步,但实际运行为异步。

并行:多个事件在同一时间发生,并且在同一时间需要处理。

并发:多个事件在相近的一段时间内发生,但是一次只能处理一件事。处理时,以分时的方式进行处理,以达到类似并行的方式。

同步:一个任务在执行时如果需要执行另一个任务,需要等待另一个任务结束后才能继续执行。

异步:一个任务在执行时如果需要执行另一个任务,不必等待另一个任务结束,继续执行。另一个任务在完成后会通过回调或通知等方式将执行结果进行返回。

1 使用协程并发运行

1.1 从顺序执行开始
kotlin
fun task1() { println("开始在线程${Thread.currentThread()}执行Task1") println("运行在线程${Thread.currentThread()}上的Task1结束") } fun task2() { println("开始在线程${Thread.currentThread()}执行Task2") println("运行在线程${Thread.currentThread()}上的Task2结束") } fun main() { println("开始") run { task1() task2() println("在线程${Thread.currentThread()}上调用Task1和Task2") } println("结束") }

image.png

1.2 创建一个协程
kotlin
import kotlinx.coroutines.* fun task1() { println("开始在线程${Thread.currentThread()}执行Task1") println("运行在线程${Thread.currentThread()}上的Task1结束") } fun task2() { println("开始在线程${Thread.currentThread()}执行Task2") println("运行在线程${Thread.currentThread()}上的Task2结束") } fun main() { println("开始") runBlocking { task1() task2() println("在线程${Thread.currentThread()}上调用Task1和Task2") } println("结束") }

image.png

该版本和1.1中的代码的区别只在于使用了runBlocking函数替换掉了run函数,在运行结果上也并没有不同。


runBlocking函数

该函数会启动一个协程,并且阻塞当前线程。如果在runBlocking函数中启动的协程中还有子协程,那么它会等待所有子协程完成才会结束。

另外,基于它的定义public actual fun <T> runBlocking(context:CoroutineContext, block: suspend CoroutineScope.() -> T): T {...},它可以返回值:

kotlin
fun main() { val result = runBlocking { println("coroutine start") //模拟耗时 delay(1000) println("coroutine end") return@runBlocking "hello" } println("process end $result") } // 打印process end hello

1.3 启动一个任务
kotlin
import kotlinx.coroutines.* fun task1() { println("开始在线程${Thread.currentThread()}执行Task1") println("运行在线程${Thread.currentThread()}上的Task1结束") } fun task2() { println("开始在线程${Thread.currentThread()}执行Task2") println("运行在线程${Thread.currentThread()}上的Task2结束") } fun main() { println("开始") runBlocking { launch { task1() } launch { task2() } println("在线程${Thread.currentThread()}上调用Task1和Task2") } println("结束") }

image.png

这个版本的代码,相比1.2的版本,多调用了launch函数。从结果上来说,println("在线程${Thread.currentThread()}上调用Task1和Task2")这一句在task1函数和task2函数执行前就执行了,这至少是并发的一个标志。


launch函数

该函数会启动一个协程,但它不阻塞当前线程。它会返回一个Job对象,表示一个作业,或者说一个协程的任务,该作业可以用于等待任务完成或取消任务。

如果launch函数不指定上下文的话,默认情况下launch函数会用Dispatchers.Default作为上下文,具体见[[Kotlin协程基础#2.1 显式设置上下文|显式设置上下文]]。

该函数和runBlocking函数一起,都不被官方推荐在正式项目中使用。


1.4 与挂起点交错调用
kotlin
import kotlinx.coroutines.* suspend fun task1() { println("开始在线程${Thread.currentThread()}执行Task1") yield() println("运行在线程${Thread.currentThread()}上的Task1结束") } suspend fun task2() { println("开始在线程${Thread.currentThread()}执行Task2") yield() println("运行在线程${Thread.currentThread()}上的Task2结束") } fun main() { println("开始") runBlocking { launch { task1() } launch { task2() } println("在线程${Thread.currentThread()}上调用Task1和Task2") } println("结束") }

image.png

这个版本的代码和1.3相比,多使用了yield函数。在结果上,可以看到task1task2两个函数的并发执行。

通过yielddelay函数,可以实现协程之间的交错调用。


yield函数

它有好几种解释,比如说让出当前协程的运行权,下调当前协程的优先级等等,效果都是相同的,即:暂停当前协程,并将运行权交给该协程所在的协程上下文中的其他协程。当其他协程被执行完毕或者自己的执行条件满足后,当前协程会被恢复执行。

该函数可以用来协作式地交替执行多个协程。

delay函数

它会暂时挂起该协程,等到指定时间结束后,再恢复该协程。


暂停,挂起,阻塞:在这份文档中,暂停和挂起同义,指协程出让运行权,让线程执行其他协程。而阻塞指的是协程和线程一起阻塞。

2 协程上下文和线程

2.1 显式设置上下文

在1.2,1.3中使用的launchrunBlocking函数可以传入一个CoroutineContext,来设置这些函数开启的协程的执行上下文。

常用的上下文如下:

  • Dispatcher.Default:在DefaultDispatcher池中的线程中开始协程。这个池中的线程数等于系统的核数,但不小于2。
  • Dispatcher.IO:在专用于运行IO密集型任务的池中执行协程。如果线程在IO上被阻塞,并且创建了更多任务,那么这个池的大小可能会增加。
  • Dispatcher.Main:可用于Android和Swing UI,运行只从main线程更新UI的任务。
  • Dispatcher.Undefined:当前CoroutineScope的线程策略。
kotlin
import kotlinx.coroutines.* suspend fun task1() { println("开始在线程${Thread.currentThread()}执行Task1") yield() println("运行在线程${Thread.currentThread()}上的Task1结束") } suspend fun task2() { println("开始在线程${Thread.currentThread()}执行Task2") yield() println("运行在线程${Thread.currentThread()}上的Task2结束") } fun main() { println("开始") runBlocking { launch(Dispatchers.Default) { task1() } launch { task2() } println("在线程${Thread.currentThread()}上调用Task1和Task2") } println("结束") }

image.png

在这段代码中,task1函数将在不同的线程中运行。并且此时,task2中的代码是并发运行,task1中的代码是并行运行。

关于这个“task2中的代码是并发运行,task1中的代码是并行运行”这句话,我的理解是这样的。 从书上的例子可以看到,task1是实打实的切换了一个线程: image.png 这个切换过程,就是书上所说的“并行运行”。但是这个我确实没法复现出来,最接近的还是:(关注Task2的运行结果) image.png 它建立在三个task,并且前两个分别暂停了300毫秒,150毫秒之上的。 所以我认为,这里所指的并行和并发并不是传统意义上的,而是只要协程中的代码有线程切换就算并行,如果协程中的代码从头到尾都在一个线程里就算并发。根据这个定义的话,原来的task1函数中yield函数使得其暂停并且下降优先级,但是没有别的协程竞争,所以它恢复执行;这个暂停和恢复的过程发生在DefaultDispatcher池中的两个线程之间,那task1确实算并行。

2.2 在自定义池中运行

在2.1中,显式的设置上下文是使用的内置的DefaultDispatcher来确定协程在哪个内置的线程池中运行。如果想要在自己的单线程池中运行协程,需要先创建一个单线程执行器,再通过asCoroutineContext函数获得一个CoroutineContext

kotlin
import kotlinx.coroutines.* import java.util.concurrent.Executors suspend fun task1() { println("开始在线程${Thread.currentThread()}执行Task1") yield() println("运行在线程${Thread.currentThread()}上的Task1结束") } suspend fun task2() { println("开始在线程${Thread.currentThread()}执行Task2") yield() println("运行在线程${Thread.currentThread()}上的Task2结束") } fun main() { Executors.newSingleThreadExecutor().asCoroutineDispatcher().use { context -> println("开始") runBlocking { launch(context) { task1() } launch { task2() } println("在线程${Thread.currentThread()}上调用Task1和Task2") } println("结束") } }

代码段中的use函数可以帮助开发者自动关闭所有协程已经结束的执行器。如果不使用use的话,该执行器不会自动关闭,程序永远都不会终止。(如果显式声明结束执行器的话,需要调用shutdown函数或shutdownNow函数)

image.png

可以看到Task1函数确实再自定义的一个线程池中运行。

如果想要自定义一个多线程池,请使用Executors.newFixedThreadPool(线程数)


Executor.newSingleThreadExecutor函数

创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有的任务按照指定顺序执行。

Executor.newFIxedThreadPool函数

创建一个定长的线程池,可以控制线程最大并发数,超出的线程会在队列中等待。

**use函数

该函数是Kotlin语言的一个语法糖,它用于在使用资源(文件,流等)后自动关闭它们,并且会自动捕捉异常,自动释放不被需要的资源。


2.3 在挂起点后切换线程

如果希望协程在调用方的上下文启动,然后在挂起点后切换到另一个线程,可以使用CoroutineContext参数和CoroutineStart参数实现。

kotlin
import kotlinx.coroutines.* import java.util.concurrent.Executors suspend fun task1() { println("开始在线程${Thread.currentThread()}执行Task1") yield() println("运行在线程${Thread.currentThread()}上的Task1结束") } suspend fun task2() { println("开始在线程${Thread.currentThread()}执行Task2") yield() println("运行在线程${Thread.currentThread()}上的Task2结束") } fun main() { Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()).asCoroutineDispatcher().use { context -> println("开始") runBlocking { launch(context = context, start = CoroutineStart.UNDISPATCHED) { task1() } launch { task2() } println("在线程${Thread.currentThread()}上调用Task1和Task2") } println("结束") } }

image.png

可以看到Task1在main线程上启动,然后在挂起点后切换到自定义的线程池中。


CoroutineStart

该类是一个枚举类,包括四个值:

  • Default:协程创建后立即根据上下文开始调度协程执行。在调度前如果协程被取消,则其直接进入取消状态。
  • LAZY:协程创建后只有在start/join/await时才开始调度协程执行。
  • ATOMIC:协程创建后不可取消(一说如果有挂起点的话只在挂起点前不可取消)
  • UNDISPATCHED:立即在当前线程(源码文档里说是类似以Dispatchers.Unconfined启动协程)执行协程,直到遇到第一个挂起点后,之后的执行取决于调度器。

2.4 修改CouroutineContext

可以在协程内部,通过withContext函数实时切换协程运行的上下文。

kotlin
import kotlinx.coroutines.* import java.util.concurrent.Executors suspend fun task1() { println("开始在线程${Thread.currentThread()}执行Task1") yield() println("运行在线程${Thread.currentThread()}上的Task1结束") } suspend fun task2() { println("开始在线程${Thread.currentThread()}执行Task2") yield() println("运行在线程${Thread.currentThread()}上的Task2结束") } fun main() { Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()).asCoroutineDispatcher().use { context -> println("开始") runBlocking { println("开始在线程${Thread.currentThread()}上调用Task1和Task2") withContext(Dispatchers.Default) { task1() } launch { task2() } println("在线程${Thread.currentThread()}上调用Task1和Task2") } println("结束") } }

image.png


withContext函数

使当前协程切换到指定的线程,并在指定的逻辑执行完成之前,自动把协程切换回去继续执行。


3 调试线程

协程可以指定协程名,并且可以在调试程序的时候查看协程名,便于调试。

kotlin
import kotlinx.coroutines.* import java.util.concurrent.Executors suspend fun task1() { println("开始在线程${Thread.currentThread()}执行Task1") yield() println("运行在线程${Thread.currentThread()}上的Task1结束") } suspend fun task2() { println("开始在线程${Thread.currentThread()}执行Task2") yield() println("运行在线程${Thread.currentThread()}上的Task2结束") } fun main() { Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()).asCoroutineDispatcher().use { context -> println("开始") runBlocking(CoroutineName("top")) { println("开始在线程${Thread.currentThread()}上调用Task1和Task2") withContext(Dispatchers.Default) { task1() } launch(CoroutineName("task runner")) { task2() } println("在线程${Thread.currentThread()}上调用Task1和Task2") } println("结束") } }

image.png


CoroutineName

表示了一个协程的名字,用于在日志或调试器中识别协程。可以通过CoroutineName函数创建一个CoroutineName对象,并指定一个字符串作为名字。


3.1 asyncawait

使用实例:

kotlin
import kotlinx.coroutines.* fun main() { runBlocking { val count: Deferred<Int> = async(Dispatchers.Default) { println("在${Thread.currentThread()}线程中请求") Runtime.getRuntime().availableProcessors() } println("在${Thread.currentThread()}线程中调用功能") println("处理器有${count.await()}个线程") } }

image.png


async函数

launch函数相似,开启一个协程并且立即执行,但是比launch函数多了一个Deferred类型的返回。通过这个Deferred类型的返回,可以获取协程运行的结果

await函数

获取Deferred类型的实例,调用该函数,以获取到对应的协程运行的结果。如果该协程没有运行完成,它会通过while(true)的方式来进行等待。

Deferred接口

定义语句:public interface Deferred<out T> : Job,可知它是Job的子接口。从功能上来看,它就是多了一个能获取结果的方法的Job


3.2 看一眼延续
kotlin
import kotlinx.coroutines.* class Compute { fun compute1(n: Long): Long = n * 2 suspend fun compute2(n: Long): Long { val factor = 2 println("$n received : Thread: ${Thread.currentThread()}") delay(n * 1000) val result = n * factor println("$n, returning $result: Thread: ${Thread.currentThread()}") return result } } fun main() = runBlocking<Unit> { val compute = Compute() launch(Dispatchers.Default) { compute.compute2(2) } launch(Dispatchers.Default) { compute.compute2(1) } }

这段代码的运行结果如下:

image.png

可以看到,第一次调用compute2方法的协程在DefaultDispatcher-worker-2DefaultDispatcher-worker-1两个线程中运行而不丢失数据,这是因为被suspend关键字声明的compute2方法使用了一个叫“延续”Continuation的数据结构。

(展示Kotlin代码的字节码再反编译之后的结果) image.png 可以看到,即使Kotlin代码中该方法只有一个参数,但是Java代码中这里多了一个Continuation类型的参数,该类型可以封装函数的部分执行结果。通过这个参数,该函数可以随时被挂起、切换线程和恢复状态而不会丢失数据。


suspend关键字

suspend用于暂停执行当前协程,并保存所有局部变量。如需调用suspend函数,只能从其他suspend函数中进行调用,或通过使用协程构建器(例如launch)来启动新的协程,在协程中调用。


4 创建无限序列

4.1 使用无限序列
kotlin
fun primes(start: Int): Sequence<Int> = sequence { println("Start to look") var index = start while (true) { if (index > 1 && (2 until index).none { i -> index % i == 0 }) { yield(index) println("Generating next after $index") } index++ }} fun main() { for (prime in primes(17)) { println("Received $prime") if (prime > 30) break } }

image.png 代码中,创建了一个比较类似迭代器的东西:Sequence,序列。虽然代码中并没有协程相关,但是每次序列运算一次值之后,都会使用之前所提到的Continuation来保存序列当前状态,以便下次被调用时使用。 image.png


Sequence序列类

Sequence类可以被视作是一个“惰性集合”,也就是对序列进行处理的时候,只要还没有访问它的元素实际的值,它就不会立即进行操作,而是在被需求时逐个生成其中的元素。这种特性在上面的无限序列中也得到了应用,即它并不会真的去计算无限多个质数,而是每次访问时就寻找下个质数。

序列可以由已知的数据源创建(sequenceOf(1, 2, 3)listOf(1,2,3).asSequence()),或者就像上面的例子一样,使用sequence函数传入计算值的lambda表达式来创建。

**Sequence序列和Iterable可迭代对象(如List集合)

Sequence序列的惰性特性,使得在某些场合下它操作大量数据时,比一众Iterable可迭代对象快了不是一点半点。

举例来说,如果现在有两个相同长度相同元素的序列和集合,同时需要进行mapfilter操作,那么集合会对所有元素循环一次来执行map操作,并且保存一个中间集合,再对所有元素循环一次来执行filter操作,从而得出结果。

相较之下,序列只会在使用具体值时开始循环,并且只会循环一次。在这一次循环中,对一个元素依次进行mapfilter操作,省去了循环两次的计算时间和中间集合的内存空间。

但是,并不是在所有情况下序列都比集合更好。在数据量小的时候,即使对序列操作的时间比集合快很多,但都是几纳秒的事情,实际感受上这两者的操作时间几乎没有什么差别。其次,在按索引访问元素时,集合相比序列有很大的优势,因为集合存储了所有元素的索引,这使得按索引访问元素是恒定的时间复杂度;而序列必须逐项进行。最后,序列不适合作为参数传递给函数,因为每次访问序列都需要计算,而集合一般只计算一次然后存储在内存中(这里的计算次数多少,更多地是想说集合在访问时不再需要更多的计算),而函数可能会对参数进行多次遍历。

综上,序列确实有速度优势,但大部分情况下可迭代对象更加泛用,而序列适合在数据量大,按索引访问少,对整体操作多时使用。

yield函数

sequence函数相配合,它会将一个元素返回给序列的使用者,并且暂停sequence函数的执行,直到使用者请求下一个元素。还有一个yieldAll函数,它和yield函数比较类似,但它会生成多个乃至无限多个值。当它生成的是无限多个值时,它之后的调用都不会再被执行。

take函数

对一个序列,取出指定个数的元素,返回这些元素组成的新序列。

kotlin
val sequence = sequence { val start = 0 yield(start) yieldAll(1..5 step 2) yieldAll(generateSequence(8) { it * 3 }) } println(sequence.take(7).toList()) // [0, 1, 3, 5, 8, 24, 72]

4.2 使用iterator函数

迭代器的两种写法如下,两种写法都对,效果也是相同的:

kotlin
operator fun ClosedRange<String>.iterator() = iterator { val next = StringBuilder(start) // 获取起始 val last = endInclusive while (last >= next.toString() && last.length >= next.length) { val result = next.toString() val lastCharacter = next.last() if (lastCharacter < Char.MAX_VALUE) { next.setCharAt(next.length - 1, lastCharacter + 1) } else { next.append(Char.MIN_VALUE) } yield(result) } } fun main() { val string = "A".."Z" val iterator = string.iterator() for(i in iterator) { println(i) } }
kotlin
operator fun ClosedRange<String>.iterator() = object : Iterator<String> { // object: Iterator<String> 这是弄了一个匿名对象来实现这个接口 private val next = StringBuilder(start) private val last = endInclusive override fun hasNext() = last >= next.toString() && last.length >= next.length override fun next(): String { val result = next.toString() val lastCharacter = next.last() if (lastCharacter < Char.MAX_VALUE) { next.setCharAt(next.length - 1, lastCharacter + 1) } else { next.append(Char.MIN_VALUE) } return result } } fun main() { val string = "A".."Z" val iterator = string.iterator() for(i in iterator) { println(i) } }

5 非阻塞异步编程

5.1 按顺序开始

现在开始创建一个程序,来获取网易云的一些歌曲信息。

(这里使用Klaxon库来解析获取的数据)

kotlin
import com.beust.klaxon.Json import com.beust.klaxon.Klaxon import java.net.URL import kotlin.system.measureTimeMillis class Music( @Json(name = "success") val code: Boolean, @Json(name = "song_id") val songId: Int, @Json(name = "name") val songName: String, @Json(name = "author") val songAuthor: String, ) { companion object { fun getMusicData(code: String): Music? { val url = "https://api.vvhan.com/api/music?id=$code&type=song&media=netease" return Klaxon().parse<Music>(URL(url).readText()) } }} fun main() { val format = "%-10s%-20s%-10s" println(String.format(format, "Code", "Song Name", "Song Author")) val time = measureTimeMillis { val musicCode = listOf("2102076419", "1969950171", "1447572611", "2065574069") val musicData: List<Music> = musicCode.mapNotNull { Music.getMusicData(it) } musicData.forEach { println(String.format(format, it.code, it.songName, it.songAuthor)) } } println("Time cost $time ms") }

image.png

5.2 进行异步

在5.1中,程序按顺序执行,每次调用都是阻塞调用,这意味着每次请求程序都将等待至请求完成才处理下一个请求。

如果要将其改造为异步程序,首先要将在main()的主体中对整个代码使用runBlocking(),之后使用async()Music类的getMusicData()启动非阻塞调用。这里要注意,如果直接调用async(),那么协程会在当前执行的协程的上下文中执行,也就是说,启动的协程也将在main线程中执行。这确实是非阻塞的并发调用,但是调用的执行将在main线程中交错运行,这不会有任何的性能优势。因此在调用async()时,需要指定它的上下文,从而使用不同的线程池来启动协程。

kotlin
import com.beust.klaxon.Json import com.beust.klaxon.Klaxon import kotlinx.coroutines.Deferred import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.async import kotlinx.coroutines.runBlocking import java.net.URL import kotlin.system.measureTimeMillis class Music( @Json(name = "success") val code: Boolean, @Json(name = "song_id") val songId: Int, @Json(name = "name") val songName: String, @Json(name = "author") val songAuthor: String, ) { companion object { fun getMusicData(code: String): Music? { val url = "https://api.vvhan.com/api/music?id=$code&type=song&media=netease" return Klaxon().parse<Music>(URL(url).readText()) } }} fun main() = runBlocking { val format = "%-10s%-20s%-10s" println(String.format(format, "Code", "Song Name", "Song Author")) val time = measureTimeMillis { val musicCode = listOf("2102076419", "1969950171", "1447572611", "2065574069") val musicData: List<Deferred<Music?>> = musicCode.map { async(Dispatchers.IO) { Music.getMusicData(it) } } musicData .mapNotNull { it.await() } .forEach { println(String.format(format, it.code, it.songName, it.songAuthor)) } } println("Time cost $time ms") }

image.png

(如果不指定async()的上下文,那么速度和顺序执行差不多↓)

image.png

5.3 异常处理

上述例子只是一个最简单的网络请求,因此很难会出现问题。在现实的编程中,网络请求、IO请求、数据库操作等等需要协程的地方都有可能会出现错误。因此对协程的异常处理也是非常重要的。

5.3.1 启动与异常

如果编程中使用launch(),那么调用方将不会收到异常,因为它是一个自主引导(fire and forget)的模型。为了演示异常,这次请求的歌曲id有两个是无效的。


fire and forget模式

fire and forget,它原本的意思指的意思是一劳永逸(翻译地生硬一点就是发射之后什么都不用管了),也有作“发后即弃”解。在多线程通信,网络通信等领域中,它指的是一种把消息发出去即可,不期待收到消息或者回复的情况。


kotlin
import com.beust.klaxon.Json import com.beust.klaxon.Klaxon import kotlinx.coroutines.* import java.net.URL import kotlin.system.measureTimeMillis class Music( @Json(name = "success") val code: Boolean, @Json(name = "song_id") val songId: Int, @Json(name = "name") val songName: String, @Json(name = "author") val songAuthor: String, ) { companion object { fun getMusicData(code: String): Music? { val url = "https://api.vvhan.com/api/music?id=$code&type=song&media=netease" return Klaxon().parse<Music>(URL(url).readText()) } }} fun main() = runBlocking { try { // val musicCode = listOf("2102076419", "1969950171", "1447572611", "2065574069") val musicCode = listOf("2102076419", "1969950171", "1111111111", "2222222222") val jobs = musicCode.map { launch(Dispatchers.IO + SupervisorJob()) { val music = Music.getMusicData(it) println("${music?.songId}, ${music?.songName}") } } jobs.forEach { it.join() } jobs.forEach { println("Cancelled: ${it.isCancelled}") } } catch (e: Exception) { e.printStackTrace() } }

image.png

可以看到,在这段程序中的try-catch并没有生效,反而是直接被中止了。因为launch()启动的协程并不会将异常传播到它们的调用方。如果使用launch(),则需要设置一个异常处理的CoroutineExceptionHandler来处理异常。

kotlin
import com.beust.klaxon.Json import com.beust.klaxon.Klaxon import kotlinx.coroutines.* import java.net.URL import kotlin.system.measureTimeMillis class Music( @Json(name = "success") val code: Boolean, @Json(name = "song_id") val songId: Int, @Json(name = "name") val songName: String, @Json(name = "author") val songAuthor: String, ) { companion object { fun getMusicData(code: String): Music? { val url = "https://api.vvhan.com/api/music?id=$code&type=song&media=netease" return Klaxon().parse<Music>(URL(url).readText()) } }} fun main() = runBlocking { val handler = CoroutineExceptionHandler {context, e -> println("Caught: ${context[CoroutineName]} ${e.message?.substring(0..28)}") } try { // val musicCode = listOf("2102076419", "1969950171", "1447572611", "2065574069") val musicCode = listOf("2102076419", "1111111111", "2222222222", "1969950171") val jobs = musicCode.map { launch(Dispatchers.IO + CoroutineName(it) + handler + SupervisorJob()) { val music = Music.getMusicData(it) println("${music?.songId}, ${music?.songName}") } } jobs.forEach { it.join() } jobs.forEach { println("Cancelled: ${it.isCancelled}") } } catch (e: Exception) { e.printStackTrace() } }

image.png

这样它就可以正确地处理异常。


SupervisorJob()函数

该函数会使启动的子协程的运行失败不会影响到其他子协程,也不会将异常传播到该子协程的父级,让子协程自己处理异常。


5.3.2 异步与异常

launch()不同,async()会返回一个Deferred<T>实例,该实例将会携带异常并在调用await()时传递给调用方。

kotlin
import com.beust.klaxon.Json import com.beust.klaxon.Klaxon import kotlinx.coroutines.* import java.net.URL import kotlin.system.measureTimeMillis class Music( @Json(name = "success") val code: Boolean, @Json(name = "song_id") val songId: Int, @Json(name = "name") val songName: String, @Json(name = "author") val songAuthor: String, ) { companion object { fun getMusicData(code: String): Music? { val url = "https://api.vvhan.com/api/music?id=$code&type=song&media=netease" return Klaxon().parse<Music>(URL(url).readText()) } }} fun main() = runBlocking { // val musicCode = listOf("2102076419", "1969950171", "1447572611", "2065574069") val musicCode = listOf("2102076419", "1111111111", "2222222222", "1969950171") val jobs = musicCode.map { async(Dispatchers.IO + SupervisorJob()) { Music.getMusicData(it) } } jobs.forEach { try { val music = it.await() println("${music?.songId}, ${music?.songName}") } catch (e: Exception) { println("Error: ${e.message?.substring(0..28)}") } } }

image.png

结果同5.3.1,异常被很好的处理了。

5.4 取消和超时

处于挂起点的协程可以被取消,由launch()返回的Job对象和async()返回的Deferred<T>对象都有一个cancel()方法和cancelAndJoin()方法,可以使用这些方法来显式地取消协程。但是,如果如果协程正忙,它将不会收到取消通知,并且可能不会退出。


结构化并发

Kotlin提供结构化并发,其中共享上下文的协程形成一个层次关系。属于一个层次结构的协程遵循一些规则并表现出规定的行为:

  • 如果一个协程共享创建它的协程的上下文,那么它就被认为是创建它的协程的子协程。
  • 父协程只有在其所有子协程完成之后才会完成。
  • 取消父协程就取消了其所有的子协程。
  • 已进入挂起点的协程可能会收到从挂起点抛出的CancellationException
  • 一个忙碌的、不在挂起点的协程,可以通过检查isActive属性来查看它是否在繁忙时被取消。
  • 如果协程有资源需要清理,那么需要在协程的finally块中进行。
  • 未处理的异常将导致协程取消。
  • 如果子协程失败,它将导致父协程被取消,进而导致该协程的同级协程被取消。

5.4.1 取消协程

如果不关心某个协程的任务是否完成,可以通过对JobDeferred<T>的实例调用cancel()cancelAndJoin()来取消它。如果该协程正忙,那么取消命令并不会执行。如果该协程正在挂起点上(如yield()await()等),那么中断时会有CancellationException异常,可以捕获该异常以确定中断的具体信息。

kotlin
import kotlinx.coroutines.* suspend fun compute(checkActive: Boolean) = coroutineScope { var count = 0L val max = 100000000000 while(if (checkActive) {isActive} else (count < max)) { count++ } if (count == max) { println("compute, checkActive $checkActive ignored cancellation") } else { println("compute, checkActive $checkActive bailed out early") }} val url = "http://httpstat.us/200?sleep=2000" fun getResponse() = java.net.URL(url).readText() suspend fun fetchResponse(callAsync: Boolean) = coroutineScope { try { val response = if (callAsync) { async { getResponse() }.await() } else { getResponse() } println(response) } catch (e: CancellationException) { println("fetchResponse called with callAsync $callAsync: ${e.message}") }} fun main() { runBlocking { val job = launch(Dispatchers.Default) { launch { compute(checkActive = false) } launch { compute(checkActive = true) } launch { fetchResponse(callAsync = false) } launch { fetchResponse(callAsync = true) } } println("Let them run ...") Thread.sleep(1000) println("OK that's enough, cancel") job.cancelAndJoin() } }

image.png

可以看到运行结果中,两个函数在有无挂起点或是否检查isActive属性之后,中断时的不同表现。有的真的被停止了,有的无视了取消指令运行到协程结束。

5.4.2 请勿打扰

假设在开发时,不希望某一个特定的,包含有挂起点的协程会被打断,可以使用withContext()函数来声明不可中断。(无挂起点的协程不会被中断)

kotlin
import kotlinx.coroutines.* suspend fun dowork(id: Int, sleep: Long) = coroutineScope { try { println("$id: entered $sleep") delay(sleep) println("$id: finished nap $sleep") withContext(NonCancellable) { println("$id: do not disturb, please") delay(5000) println("$id: ok, you can talk to me now") } println("$id: outside the restricted context") println("$id: isActive: $isActive") } catch (e: CancellationException) { println("$id, dowork($sleep) was canceled") }} fun main() { runBlocking { val job = launch(Dispatchers.Default) { launch { dowork(1, 3000) } launch { dowork(2, 1000) } } Thread.sleep(2000) job.cancel() println("cancelling") job.join() println("done") } }

image.png

从这段代码中就能很好的看出NonCancellable上下文的作用。

5.4.3 监督作业
kotlin
import kotlinx.coroutines.* import java.net.URL suspend fun fetchResponse(code: Int, delay: Int) = coroutineScope { try { val response = async { URL("http://httpstat.us/$code?sleep=$delay").readText() }.await() println(response) } catch (e: CancellationException) { println("${e.message} for fetchResponse $code") } } fun main() { runBlocking { val handler = CoroutineExceptionHandler { _, e -> println("Exception handled: ${e.message}") } val job = launch (Dispatchers.IO + SupervisorJob() + handler) { launch { fetchResponse(200, 5000) } launch { fetchResponse(202, 1000) } launch { fetchResponse(404, 2000) } } job.join() } }

image.png

之前的结构化并发中说过,当协程遇到未处理的异常时,它将自动取消,它的父协程也将自动取消,连带着它的同级协程也会一起取消。这段代码就演示了这种行为:404请求遇到了异常,它和它的父协程被取消,连带着200请求被取消。

如果想让404请求的协程不会影响别的协程,可以设置一个监督器:

kotlin
import kotlinx.coroutines.* import java.net.URL suspend fun fetchResponse(code: Int, delay: Int) = coroutineScope { try { val response = async { URL("http://httpstat.us/$code?sleep=$delay").readText() }.await() println(response) } catch (e: CancellationException) { println("${e.message} for fetchResponse $code") }} fun main() { runBlocking { val handler = CoroutineExceptionHandler { _, e -> println("Exception handled: ${e.message}") } val job = launch (Dispatchers.IO + handler) { supervisorScope { launch { fetchResponse(200, 5000) } launch { fetchResponse(202, 1000) } launch { fetchResponse(404, 2000) } } } Thread.sleep(4000) println("200 should still be running at this time") println("let the parent cancel now") job.cancel() job.join() } }

这次可以发现,404请求的协程被取消并没有影响200请求的协程,但是父协程被取消后200请求的协程也被取消。

5.4.4 超时
kotlin
import kotlinx.coroutines.* import java.net.URL suspend fun fetchResponse(code: Int, delay: Int) = coroutineScope { try { val response = async { URL("http://httpstat.us/$code?sleep=$delay").readText() }.await() println(response) } catch (e: CancellationException) { println("${e.message} for fetchResponse $code") } } fun main() { runBlocking { val handler = CoroutineExceptionHandler { _, e -> println("Exception handled: ${e.message}") } val job = launch (Dispatchers.IO + handler) { withTimeout(3000) { launch { fetchResponse(200, 5000) } launch { fetchResponse(201, 1000) } launch { fetchResponse(202, 2000) } } } job.join() } }

image.png

这适用于限时情况下运行协程。

本文作者:御坂19327号

本文链接:

版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!