协程,又称轻量级线程,它的实现是在编译器上的,而不是操作系统的。它有多个入口点,可以在指定位置挂起和恢复执行,由用户自由地进行调控。引入它是为了“用同步的语义来解决异步的问题”,即业务逻辑看起来是同步,但实际运行为异步。
并行:多个事件在同一时间发生,并且在同一时间需要处理。
并发:多个事件在相近的一段时间内发生,但是一次只能处理一件事。处理时,以分时的方式进行处理,以达到类似并行的方式。
同步:一个任务在执行时如果需要执行另一个任务,需要等待另一个任务结束后才能继续执行。
异步:一个任务在执行时如果需要执行另一个任务,不必等待另一个任务结束,继续执行。另一个任务在完成后会通过回调或通知等方式将执行结果进行返回。
kotlinfun task1() {
println("开始在线程${Thread.currentThread()}执行Task1")
println("运行在线程${Thread.currentThread()}上的Task1结束")
}
fun task2() {
println("开始在线程${Thread.currentThread()}执行Task2")
println("运行在线程${Thread.currentThread()}上的Task2结束")
}
fun main() {
println("开始")
run {
task1()
task2()
println("在线程${Thread.currentThread()}上调用Task1和Task2")
}
println("结束")
}
kotlinimport kotlinx.coroutines.*
fun task1() {
println("开始在线程${Thread.currentThread()}执行Task1")
println("运行在线程${Thread.currentThread()}上的Task1结束")
}
fun task2() {
println("开始在线程${Thread.currentThread()}执行Task2")
println("运行在线程${Thread.currentThread()}上的Task2结束")
}
fun main() {
println("开始")
runBlocking {
task1()
task2()
println("在线程${Thread.currentThread()}上调用Task1和Task2")
}
println("结束")
}
该版本和1.1中的代码的区别只在于使用了runBlocking函数替换掉了run函数,在运行结果上也并没有不同。
runBlocking
函数
该函数会启动一个协程,并且阻塞当前线程。如果在runBlocking函数中启动的协程中还有子协程,那么它会等待所有子协程完成才会结束。
另外,基于它的定义public actual fun <T> runBlocking(context:CoroutineContext, block: suspend CoroutineScope.() -> T): T {...}
,它可以返回值:
kotlinfun main() {
val result = runBlocking {
println("coroutine start")
//模拟耗时
delay(1000)
println("coroutine end")
return@runBlocking "hello"
}
println("process end $result")
}
// 打印process end hello
kotlinimport kotlinx.coroutines.*
fun task1() {
println("开始在线程${Thread.currentThread()}执行Task1")
println("运行在线程${Thread.currentThread()}上的Task1结束")
}
fun task2() {
println("开始在线程${Thread.currentThread()}执行Task2")
println("运行在线程${Thread.currentThread()}上的Task2结束")
}
fun main() {
println("开始")
runBlocking {
launch { task1() }
launch { task2() }
println("在线程${Thread.currentThread()}上调用Task1和Task2")
}
println("结束")
}
这个版本的代码,相比1.2的版本,多调用了launch
函数。从结果上来说,println("在线程${Thread.currentThread()}上调用Task1和Task2")
这一句在task1
函数和task2
函数执行前就执行了,这至少是并发的一个标志。
launch
函数
该函数会启动一个协程,但它不阻塞当前线程。它会返回一个Job
对象,表示一个作业,或者说一个协程的任务,该作业可以用于等待任务完成或取消任务。
如果launch
函数不指定上下文的话,默认情况下launch
函数会用Dispatchers.Default
作为上下文,具体见[[Kotlin协程基础#2.1 显式设置上下文|显式设置上下文]]。
该函数和runBlocking
函数一起,都不被官方推荐在正式项目中使用。
kotlinimport kotlinx.coroutines.*
suspend fun task1() {
println("开始在线程${Thread.currentThread()}执行Task1")
yield()
println("运行在线程${Thread.currentThread()}上的Task1结束")
}
suspend fun task2() {
println("开始在线程${Thread.currentThread()}执行Task2")
yield()
println("运行在线程${Thread.currentThread()}上的Task2结束")
}
fun main() {
println("开始")
runBlocking {
launch { task1() }
launch { task2() }
println("在线程${Thread.currentThread()}上调用Task1和Task2")
}
println("结束")
}
这个版本的代码和1.3相比,多使用了yield
函数。在结果上,可以看到task1
,task2
两个函数的并发执行。
通过yield
和delay
函数,可以实现协程之间的交错调用。
yield
函数
它有好几种解释,比如说让出当前协程的运行权,下调当前协程的优先级等等,效果都是相同的,即:暂停当前协程,并将运行权交给该协程所在的协程上下文中的其他协程。当其他协程被执行完毕或者自己的执行条件满足后,当前协程会被恢复执行。
该函数可以用来协作式地交替执行多个协程。
delay
函数
它会暂时挂起该协程,等到指定时间结束后,再恢复该协程。
暂停,挂起,阻塞:在这份文档中,暂停和挂起同义,指协程出让运行权,让线程执行其他协程。而阻塞指的是协程和线程一起阻塞。
在1.2,1.3中使用的launch
,runBlocking
函数可以传入一个CoroutineContext
,来设置这些函数开启的协程的执行上下文。
常用的上下文如下:
Dispatcher.Default
:在DefaultDispatcher
池中的线程中开始协程。这个池中的线程数等于系统的核数,但不小于2。Dispatcher.IO
:在专用于运行IO密集型任务的池中执行协程。如果线程在IO上被阻塞,并且创建了更多任务,那么这个池的大小可能会增加。Dispatcher.Main
:可用于Android和Swing UI,运行只从main线程更新UI的任务。Dispatcher.Undefined
:当前CoroutineScope的线程策略。kotlinimport kotlinx.coroutines.*
suspend fun task1() {
println("开始在线程${Thread.currentThread()}执行Task1")
yield()
println("运行在线程${Thread.currentThread()}上的Task1结束")
}
suspend fun task2() {
println("开始在线程${Thread.currentThread()}执行Task2")
yield()
println("运行在线程${Thread.currentThread()}上的Task2结束")
}
fun main() {
println("开始")
runBlocking {
launch(Dispatchers.Default) { task1() }
launch { task2() }
println("在线程${Thread.currentThread()}上调用Task1和Task2")
}
println("结束")
}
在这段代码中,task1
函数将在不同的线程中运行。并且此时,task2
中的代码是并发运行,task1
中的代码是并行运行。
关于这个“
task2
中的代码是并发运行,task1
中的代码是并行运行”这句话,我的理解是这样的。 从书上的例子可以看到,task1
是实打实的切换了一个线程:这个切换过程,就是书上所说的“并行运行”。但是这个我确实没法复现出来,最接近的还是:(关注
Task2
的运行结果)它建立在三个task,并且前两个分别暂停了300毫秒,150毫秒之上的。 所以我认为,这里所指的并行和并发并不是传统意义上的,而是只要协程中的代码有线程切换就算并行,如果协程中的代码从头到尾都在一个线程里就算并发。根据这个定义的话,原来的
task1
函数中yield
函数使得其暂停并且下降优先级,但是没有别的协程竞争,所以它恢复执行;这个暂停和恢复的过程发生在DefaultDispatcher
池中的两个线程之间,那task1
确实算并行。
在2.1中,显式的设置上下文是使用的内置的DefaultDispatcher
来确定协程在哪个内置的线程池中运行。如果想要在自己的单线程池中运行协程,需要先创建一个单线程执行器,再通过asCoroutineContext
函数获得一个CoroutineContext
。
kotlinimport kotlinx.coroutines.*
import java.util.concurrent.Executors
suspend fun task1() {
println("开始在线程${Thread.currentThread()}执行Task1")
yield()
println("运行在线程${Thread.currentThread()}上的Task1结束")
}
suspend fun task2() {
println("开始在线程${Thread.currentThread()}执行Task2")
yield()
println("运行在线程${Thread.currentThread()}上的Task2结束")
}
fun main() {
Executors.newSingleThreadExecutor().asCoroutineDispatcher().use { context ->
println("开始")
runBlocking {
launch(context) { task1() }
launch { task2() }
println("在线程${Thread.currentThread()}上调用Task1和Task2")
}
println("结束")
}
}
代码段中的use
函数可以帮助开发者自动关闭所有协程已经结束的执行器。如果不使用use
的话,该执行器不会自动关闭,程序永远都不会终止。(如果显式声明结束执行器的话,需要调用shutdown
函数或shutdownNow
函数)
可以看到Task1
函数确实再自定义的一个线程池中运行。
如果想要自定义一个多线程池,请使用Executors.newFixedThreadPool(线程数)
。
Executor.newSingleThreadExecutor
函数
创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有的任务按照指定顺序执行。
Executor.newFIxedThreadPool
函数
创建一个定长的线程池,可以控制线程最大并发数,超出的线程会在队列中等待。
**
use
函数
该函数是Kotlin语言的一个语法糖,它用于在使用资源(文件,流等)后自动关闭它们,并且会自动捕捉异常,自动释放不被需要的资源。
如果希望协程在调用方的上下文启动,然后在挂起点后切换到另一个线程,可以使用CoroutineContext
参数和CoroutineStart
参数实现。
kotlinimport kotlinx.coroutines.*
import java.util.concurrent.Executors
suspend fun task1() {
println("开始在线程${Thread.currentThread()}执行Task1")
yield()
println("运行在线程${Thread.currentThread()}上的Task1结束")
}
suspend fun task2() {
println("开始在线程${Thread.currentThread()}执行Task2")
yield()
println("运行在线程${Thread.currentThread()}上的Task2结束")
}
fun main() {
Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()).asCoroutineDispatcher().use { context ->
println("开始")
runBlocking {
launch(context = context, start = CoroutineStart.UNDISPATCHED) { task1() }
launch { task2() }
println("在线程${Thread.currentThread()}上调用Task1和Task2")
}
println("结束")
}
}
可以看到Task1
在main线程上启动,然后在挂起点后切换到自定义的线程池中。
CoroutineStart
类
该类是一个枚举类,包括四个值:
Default
:协程创建后立即根据上下文开始调度协程执行。在调度前如果协程被取消,则其直接进入取消状态。LAZY
:协程创建后只有在start/join/await
时才开始调度协程执行。ATOMIC
:协程创建后不可取消(一说如果有挂起点的话只在挂起点前不可取消)UNDISPATCHED
:立即在当前线程(源码文档里说是类似以Dispatchers.Unconfined
启动协程)执行协程,直到遇到第一个挂起点后,之后的执行取决于调度器。CouroutineContext
可以在协程内部,通过withContext
函数实时切换协程运行的上下文。
kotlinimport kotlinx.coroutines.*
import java.util.concurrent.Executors
suspend fun task1() {
println("开始在线程${Thread.currentThread()}执行Task1")
yield()
println("运行在线程${Thread.currentThread()}上的Task1结束")
}
suspend fun task2() {
println("开始在线程${Thread.currentThread()}执行Task2")
yield()
println("运行在线程${Thread.currentThread()}上的Task2结束")
}
fun main() {
Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()).asCoroutineDispatcher().use { context ->
println("开始")
runBlocking {
println("开始在线程${Thread.currentThread()}上调用Task1和Task2")
withContext(Dispatchers.Default) { task1() }
launch { task2() }
println("在线程${Thread.currentThread()}上调用Task1和Task2")
}
println("结束")
}
}
withContext
函数
使当前协程切换到指定的线程,并在指定的逻辑执行完成之前,自动把协程切换回去继续执行。
协程可以指定协程名,并且可以在调试程序的时候查看协程名,便于调试。
kotlinimport kotlinx.coroutines.*
import java.util.concurrent.Executors
suspend fun task1() {
println("开始在线程${Thread.currentThread()}执行Task1")
yield()
println("运行在线程${Thread.currentThread()}上的Task1结束")
}
suspend fun task2() {
println("开始在线程${Thread.currentThread()}执行Task2")
yield()
println("运行在线程${Thread.currentThread()}上的Task2结束")
}
fun main() {
Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()).asCoroutineDispatcher().use { context ->
println("开始")
runBlocking(CoroutineName("top")) {
println("开始在线程${Thread.currentThread()}上调用Task1和Task2")
withContext(Dispatchers.Default) { task1() }
launch(CoroutineName("task runner")) { task2() }
println("在线程${Thread.currentThread()}上调用Task1和Task2")
}
println("结束")
}
}
CoroutineName
类
表示了一个协程的名字,用于在日志或调试器中识别协程。可以通过CoroutineName
函数创建一个CoroutineName
对象,并指定一个字符串作为名字。
async
和await
使用实例:
kotlinimport kotlinx.coroutines.*
fun main() {
runBlocking {
val count: Deferred<Int> = async(Dispatchers.Default) {
println("在${Thread.currentThread()}线程中请求")
Runtime.getRuntime().availableProcessors()
}
println("在${Thread.currentThread()}线程中调用功能")
println("处理器有${count.await()}个线程")
}
}
async
函数
和launch
函数相似,开启一个协程并且立即执行,但是比launch
函数多了一个Deferred
类型的返回。通过这个Deferred
类型的返回,可以获取协程运行的结果
await
函数
获取Deferred
类型的实例,调用该函数,以获取到对应的协程运行的结果。如果该协程没有运行完成,它会通过while(true)
的方式来进行等待。
Deferred
接口
定义语句:public interface Deferred<out T> : Job
,可知它是Job
的子接口。从功能上来看,它就是多了一个能获取结果的方法的Job
。
kotlinimport kotlinx.coroutines.*
class Compute {
fun compute1(n: Long): Long = n * 2
suspend fun compute2(n: Long): Long {
val factor = 2
println("$n received : Thread: ${Thread.currentThread()}")
delay(n * 1000)
val result = n * factor
println("$n, returning $result: Thread: ${Thread.currentThread()}")
return result
}
}
fun main() = runBlocking<Unit> {
val compute = Compute()
launch(Dispatchers.Default) {
compute.compute2(2)
}
launch(Dispatchers.Default) {
compute.compute2(1)
}
}
这段代码的运行结果如下:
可以看到,第一次调用compute2
方法的协程在DefaultDispatcher-worker-2
和DefaultDispatcher-worker-1
两个线程中运行而不丢失数据,这是因为被suspend
关键字声明的compute2
方法使用了一个叫“延续”Continuation
的数据结构。
(展示Kotlin代码的字节码再反编译之后的结果)
可以看到,即使Kotlin代码中该方法只有一个参数,但是Java代码中这里多了一个
Continuation
类型的参数,该类型可以封装函数的部分执行结果。通过这个参数,该函数可以随时被挂起、切换线程和恢复状态而不会丢失数据。
suspend
关键字
suspend
用于暂停执行当前协程,并保存所有局部变量。如需调用suspend
函数,只能从其他suspend
函数中进行调用,或通过使用协程构建器(例如launch
)来启动新的协程,在协程中调用。
kotlinfun primes(start: Int): Sequence<Int> = sequence {
println("Start to look")
var index = start
while (true) {
if (index > 1 && (2 until index).none { i -> index % i == 0 }) {
yield(index)
println("Generating next after $index")
}
index++
}}
fun main() {
for (prime in primes(17)) {
println("Received $prime")
if (prime > 30) break
}
}
代码中,创建了一个比较类似迭代器的东西:
Sequence
,序列。虽然代码中并没有协程相关,但是每次序列运算一次值之后,都会使用之前所提到的Continuation
来保存序列当前状态,以便下次被调用时使用。
Sequence
序列类
Sequence
类可以被视作是一个“惰性集合”,也就是对序列进行处理的时候,只要还没有访问它的元素实际的值,它就不会立即进行操作,而是在被需求时逐个生成其中的元素。这种特性在上面的无限序列中也得到了应用,即它并不会真的去计算无限多个质数,而是每次访问时就寻找下个质数。
序列可以由已知的数据源创建(sequenceOf(1, 2, 3)
,listOf(1,2,3).asSequence()
),或者就像上面的例子一样,使用sequence
函数传入计算值的lambda
表达式来创建。
**
Sequence
序列和Iterable
可迭代对象(如List
集合)
Sequence
序列的惰性特性,使得在某些场合下它操作大量数据时,比一众Iterable
可迭代对象快了不是一点半点。
举例来说,如果现在有两个相同长度相同元素的序列和集合,同时需要进行map
和filter
操作,那么集合会对所有元素循环一次来执行map
操作,并且保存一个中间集合,再对所有元素循环一次来执行filter
操作,从而得出结果。
相较之下,序列只会在使用具体值时开始循环,并且只会循环一次。在这一次循环中,对一个元素依次进行map
和filter
操作,省去了循环两次的计算时间和中间集合的内存空间。
但是,并不是在所有情况下序列都比集合更好。在数据量小的时候,即使对序列操作的时间比集合快很多,但都是几纳秒的事情,实际感受上这两者的操作时间几乎没有什么差别。其次,在按索引访问元素时,集合相比序列有很大的优势,因为集合存储了所有元素的索引,这使得按索引访问元素是恒定的时间复杂度;而序列必须逐项进行。最后,序列不适合作为参数传递给函数,因为每次访问序列都需要计算,而集合一般只计算一次然后存储在内存中(这里的计算次数多少,更多地是想说集合在访问时不再需要更多的计算),而函数可能会对参数进行多次遍历。
综上,序列确实有速度优势,但大部分情况下可迭代对象更加泛用,而序列适合在数据量大,按索引访问少,对整体操作多时使用。
yield
函数
和sequence
函数相配合,它会将一个元素返回给序列的使用者,并且暂停sequence
函数的执行,直到使用者请求下一个元素。还有一个yieldAll
函数,它和yield
函数比较类似,但它会生成多个乃至无限多个值。当它生成的是无限多个值时,它之后的调用都不会再被执行。
take
函数
对一个序列,取出指定个数的元素,返回这些元素组成的新序列。
kotlinval sequence = sequence {
val start = 0
yield(start)
yieldAll(1..5 step 2)
yieldAll(generateSequence(8) { it * 3 })
}
println(sequence.take(7).toList()) // [0, 1, 3, 5, 8, 24, 72]
迭代器的两种写法如下,两种写法都对,效果也是相同的:
kotlinoperator fun ClosedRange<String>.iterator() = iterator {
val next = StringBuilder(start) // 获取起始
val last = endInclusive
while (last >= next.toString() && last.length >= next.length) {
val result = next.toString()
val lastCharacter = next.last()
if (lastCharacter < Char.MAX_VALUE) {
next.setCharAt(next.length - 1, lastCharacter + 1)
} else {
next.append(Char.MIN_VALUE)
} yield(result)
}
}
fun main() {
val string = "A".."Z"
val iterator = string.iterator()
for(i in iterator) {
println(i)
}
}
kotlinoperator fun ClosedRange<String>.iterator() = object : Iterator<String> {
// object: Iterator<String> 这是弄了一个匿名对象来实现这个接口
private val next = StringBuilder(start)
private val last = endInclusive
override fun hasNext() = last >= next.toString() && last.length >= next.length
override fun next(): String {
val result = next.toString()
val lastCharacter = next.last()
if (lastCharacter < Char.MAX_VALUE) {
next.setCharAt(next.length - 1, lastCharacter + 1)
} else {
next.append(Char.MIN_VALUE)
} return result
}
}
fun main() {
val string = "A".."Z"
val iterator = string.iterator()
for(i in iterator) {
println(i)
}
}
现在开始创建一个程序,来获取网易云的一些歌曲信息。
(这里使用Klaxon
库来解析获取的数据)
kotlinimport com.beust.klaxon.Json
import com.beust.klaxon.Klaxon
import java.net.URL
import kotlin.system.measureTimeMillis
class Music(
@Json(name = "success") val code: Boolean,
@Json(name = "song_id") val songId: Int,
@Json(name = "name") val songName: String,
@Json(name = "author") val songAuthor: String,
) {
companion object {
fun getMusicData(code: String): Music? {
val url = "https://api.vvhan.com/api/music?id=$code&type=song&media=netease"
return Klaxon().parse<Music>(URL(url).readText())
} }}
fun main() {
val format = "%-10s%-20s%-10s"
println(String.format(format, "Code", "Song Name", "Song Author"))
val time = measureTimeMillis {
val musicCode = listOf("2102076419", "1969950171", "1447572611", "2065574069")
val musicData: List<Music> = musicCode.mapNotNull {
Music.getMusicData(it)
}
musicData.forEach {
println(String.format(format, it.code, it.songName, it.songAuthor))
}
}
println("Time cost $time ms")
}
在5.1中,程序按顺序执行,每次调用都是阻塞调用,这意味着每次请求程序都将等待至请求完成才处理下一个请求。
如果要将其改造为异步程序,首先要将在main()
的主体中对整个代码使用runBlocking()
,之后使用async()
对Music
类的getMusicData()
启动非阻塞调用。这里要注意,如果直接调用async()
,那么协程会在当前执行的协程的上下文中执行,也就是说,启动的协程也将在main
线程中执行。这确实是非阻塞的并发调用,但是调用的执行将在main
线程中交错运行,这不会有任何的性能优势。因此在调用async()
时,需要指定它的上下文,从而使用不同的线程池来启动协程。
kotlinimport com.beust.klaxon.Json
import com.beust.klaxon.Klaxon
import kotlinx.coroutines.Deferred
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.async
import kotlinx.coroutines.runBlocking
import java.net.URL
import kotlin.system.measureTimeMillis
class Music(
@Json(name = "success") val code: Boolean,
@Json(name = "song_id") val songId: Int,
@Json(name = "name") val songName: String,
@Json(name = "author") val songAuthor: String,
) {
companion object {
fun getMusicData(code: String): Music? {
val url = "https://api.vvhan.com/api/music?id=$code&type=song&media=netease"
return Klaxon().parse<Music>(URL(url).readText())
} }}
fun main() = runBlocking {
val format = "%-10s%-20s%-10s"
println(String.format(format, "Code", "Song Name", "Song Author"))
val time = measureTimeMillis {
val musicCode = listOf("2102076419", "1969950171", "1447572611", "2065574069")
val musicData: List<Deferred<Music?>> = musicCode.map {
async(Dispatchers.IO) {
Music.getMusicData(it)
}
}
musicData
.mapNotNull { it.await() }
.forEach { println(String.format(format, it.code, it.songName, it.songAuthor)) }
}
println("Time cost $time ms")
}
(如果不指定async()
的上下文,那么速度和顺序执行差不多↓)
上述例子只是一个最简单的网络请求,因此很难会出现问题。在现实的编程中,网络请求、IO请求、数据库操作等等需要协程的地方都有可能会出现错误。因此对协程的异常处理也是非常重要的。
如果编程中使用launch()
,那么调用方将不会收到异常,因为它是一个自主引导(fire and forget
)的模型。为了演示异常,这次请求的歌曲id有两个是无效的。
fire and forget
模式
fire and forget
,它原本的意思指的意思是一劳永逸(翻译地生硬一点就是发射之后什么都不用管了),也有作“发后即弃”解。在多线程通信,网络通信等领域中,它指的是一种把消息发出去即可,不期待收到消息或者回复的情况。
kotlinimport com.beust.klaxon.Json
import com.beust.klaxon.Klaxon
import kotlinx.coroutines.*
import java.net.URL
import kotlin.system.measureTimeMillis
class Music(
@Json(name = "success") val code: Boolean,
@Json(name = "song_id") val songId: Int,
@Json(name = "name") val songName: String,
@Json(name = "author") val songAuthor: String,
) {
companion object {
fun getMusicData(code: String): Music? {
val url = "https://api.vvhan.com/api/music?id=$code&type=song&media=netease"
return Klaxon().parse<Music>(URL(url).readText())
} }}
fun main() = runBlocking {
try {
// val musicCode = listOf("2102076419", "1969950171", "1447572611", "2065574069")
val musicCode = listOf("2102076419", "1969950171", "1111111111", "2222222222")
val jobs = musicCode.map {
launch(Dispatchers.IO + SupervisorJob()) {
val music = Music.getMusicData(it)
println("${music?.songId}, ${music?.songName}")
}
}
jobs.forEach { it.join() }
jobs.forEach { println("Cancelled: ${it.isCancelled}") }
} catch (e: Exception) {
e.printStackTrace()
}
}
可以看到,在这段程序中的try-catch
并没有生效,反而是直接被中止了。因为launch()
启动的协程并不会将异常传播到它们的调用方。如果使用launch()
,则需要设置一个异常处理的CoroutineExceptionHandler
来处理异常。
kotlinimport com.beust.klaxon.Json
import com.beust.klaxon.Klaxon
import kotlinx.coroutines.*
import java.net.URL
import kotlin.system.measureTimeMillis
class Music(
@Json(name = "success") val code: Boolean,
@Json(name = "song_id") val songId: Int,
@Json(name = "name") val songName: String,
@Json(name = "author") val songAuthor: String,
) {
companion object {
fun getMusicData(code: String): Music? {
val url = "https://api.vvhan.com/api/music?id=$code&type=song&media=netease"
return Klaxon().parse<Music>(URL(url).readText())
} }}
fun main() = runBlocking {
val handler = CoroutineExceptionHandler {context, e ->
println("Caught: ${context[CoroutineName]} ${e.message?.substring(0..28)}")
}
try {
// val musicCode = listOf("2102076419", "1969950171", "1447572611", "2065574069")
val musicCode = listOf("2102076419", "1111111111", "2222222222", "1969950171")
val jobs = musicCode.map {
launch(Dispatchers.IO + CoroutineName(it) + handler + SupervisorJob()) {
val music = Music.getMusicData(it)
println("${music?.songId}, ${music?.songName}")
}
}
jobs.forEach { it.join() }
jobs.forEach { println("Cancelled: ${it.isCancelled}") }
} catch (e: Exception) {
e.printStackTrace()
}
}
这样它就可以正确地处理异常。
SupervisorJob()
函数
该函数会使启动的子协程的运行失败不会影响到其他子协程,也不会将异常传播到该子协程的父级,让子协程自己处理异常。
与launch()
不同,async()
会返回一个Deferred<T>
实例,该实例将会携带异常并在调用await()
时传递给调用方。
kotlinimport com.beust.klaxon.Json
import com.beust.klaxon.Klaxon
import kotlinx.coroutines.*
import java.net.URL
import kotlin.system.measureTimeMillis
class Music(
@Json(name = "success") val code: Boolean,
@Json(name = "song_id") val songId: Int,
@Json(name = "name") val songName: String,
@Json(name = "author") val songAuthor: String,
) {
companion object {
fun getMusicData(code: String): Music? {
val url = "https://api.vvhan.com/api/music?id=$code&type=song&media=netease"
return Klaxon().parse<Music>(URL(url).readText())
} }}
fun main() = runBlocking {
// val musicCode = listOf("2102076419", "1969950171", "1447572611", "2065574069")
val musicCode = listOf("2102076419", "1111111111", "2222222222", "1969950171")
val jobs = musicCode.map {
async(Dispatchers.IO + SupervisorJob()) {
Music.getMusicData(it)
}
}
jobs.forEach {
try {
val music = it.await()
println("${music?.songId}, ${music?.songName}")
} catch (e: Exception) {
println("Error: ${e.message?.substring(0..28)}")
}
}
}
结果同5.3.1,异常被很好的处理了。
处于挂起点的协程可以被取消,由launch()
返回的Job
对象和async()
返回的Deferred<T>
对象都有一个cancel()
方法和cancelAndJoin()
方法,可以使用这些方法来显式地取消协程。但是,如果如果协程正忙,它将不会收到取消通知,并且可能不会退出。
结构化并发
Kotlin提供结构化并发,其中共享上下文的协程形成一个层次关系。属于一个层次结构的协程遵循一些规则并表现出规定的行为:
CancellationException
。isActive
属性来查看它是否在繁忙时被取消。finally
块中进行。如果不关心某个协程的任务是否完成,可以通过对Job
或Deferred<T>
的实例调用cancel()
或cancelAndJoin()
来取消它。如果该协程正忙,那么取消命令并不会执行。如果该协程正在挂起点上(如yield()
或await()
等),那么中断时会有CancellationException
异常,可以捕获该异常以确定中断的具体信息。
kotlinimport kotlinx.coroutines.*
suspend fun compute(checkActive: Boolean) = coroutineScope {
var count = 0L
val max = 100000000000
while(if (checkActive) {isActive} else (count < max)) {
count++
} if (count == max) {
println("compute, checkActive $checkActive ignored cancellation")
} else {
println("compute, checkActive $checkActive bailed out early")
}}
val url = "http://httpstat.us/200?sleep=2000"
fun getResponse() = java.net.URL(url).readText()
suspend fun fetchResponse(callAsync: Boolean) = coroutineScope {
try {
val response = if (callAsync) {
async { getResponse() }.await()
} else {
getResponse()
}
println(response)
} catch (e: CancellationException) {
println("fetchResponse called with callAsync $callAsync: ${e.message}")
}}
fun main() {
runBlocking {
val job = launch(Dispatchers.Default) {
launch { compute(checkActive = false) }
launch { compute(checkActive = true) }
launch { fetchResponse(callAsync = false) }
launch { fetchResponse(callAsync = true) }
}
println("Let them run ...")
Thread.sleep(1000)
println("OK that's enough, cancel")
job.cancelAndJoin()
}
}
可以看到运行结果中,两个函数在有无挂起点或是否检查isActive
属性之后,中断时的不同表现。有的真的被停止了,有的无视了取消指令运行到协程结束。
假设在开发时,不希望某一个特定的,包含有挂起点的协程会被打断,可以使用withContext()
函数来声明不可中断。(无挂起点的协程不会被中断)
kotlinimport kotlinx.coroutines.*
suspend fun dowork(id: Int, sleep: Long) = coroutineScope {
try {
println("$id: entered $sleep")
delay(sleep)
println("$id: finished nap $sleep")
withContext(NonCancellable) {
println("$id: do not disturb, please")
delay(5000)
println("$id: ok, you can talk to me now")
}
println("$id: outside the restricted context")
println("$id: isActive: $isActive")
} catch (e: CancellationException) {
println("$id, dowork($sleep) was canceled")
}}
fun main() {
runBlocking {
val job = launch(Dispatchers.Default) {
launch { dowork(1, 3000) }
launch { dowork(2, 1000) }
} Thread.sleep(2000)
job.cancel()
println("cancelling")
job.join()
println("done")
}
}
从这段代码中就能很好的看出NonCancellable
上下文的作用。
kotlinimport kotlinx.coroutines.*
import java.net.URL
suspend fun fetchResponse(code: Int, delay: Int) = coroutineScope {
try {
val response = async {
URL("http://httpstat.us/$code?sleep=$delay").readText()
}.await()
println(response)
} catch (e: CancellationException) {
println("${e.message} for fetchResponse $code")
}
}
fun main() {
runBlocking {
val handler = CoroutineExceptionHandler { _, e ->
println("Exception handled: ${e.message}")
}
val job = launch (Dispatchers.IO + SupervisorJob() + handler) {
launch { fetchResponse(200, 5000) }
launch { fetchResponse(202, 1000) }
launch { fetchResponse(404, 2000) }
}
job.join()
}
}
之前的结构化并发中说过,当协程遇到未处理的异常时,它将自动取消,它的父协程也将自动取消,连带着它的同级协程也会一起取消。这段代码就演示了这种行为:404请求遇到了异常,它和它的父协程被取消,连带着200请求被取消。
如果想让404请求的协程不会影响别的协程,可以设置一个监督器:
kotlinimport kotlinx.coroutines.*
import java.net.URL
suspend fun fetchResponse(code: Int, delay: Int) = coroutineScope {
try {
val response = async {
URL("http://httpstat.us/$code?sleep=$delay").readText()
}.await()
println(response)
} catch (e: CancellationException) {
println("${e.message} for fetchResponse $code")
}}
fun main() {
runBlocking {
val handler = CoroutineExceptionHandler { _, e ->
println("Exception handled: ${e.message}")
}
val job = launch (Dispatchers.IO + handler) {
supervisorScope {
launch { fetchResponse(200, 5000) }
launch { fetchResponse(202, 1000) }
launch { fetchResponse(404, 2000) }
}
}
Thread.sleep(4000)
println("200 should still be running at this time")
println("let the parent cancel now")
job.cancel()
job.join()
}
}
这次可以发现,404请求的协程被取消并没有影响200请求的协程,但是父协程被取消后200请求的协程也被取消。
kotlinimport kotlinx.coroutines.*
import java.net.URL
suspend fun fetchResponse(code: Int, delay: Int) = coroutineScope {
try {
val response = async {
URL("http://httpstat.us/$code?sleep=$delay").readText()
}.await()
println(response)
} catch (e: CancellationException) {
println("${e.message} for fetchResponse $code")
}
}
fun main() {
runBlocking {
val handler = CoroutineExceptionHandler { _, e ->
println("Exception handled: ${e.message}")
}
val job = launch (Dispatchers.IO + handler) {
withTimeout(3000) {
launch { fetchResponse(200, 5000) }
launch { fetchResponse(201, 1000) }
launch { fetchResponse(202, 2000) }
}
}
job.join()
}
}
这适用于限时情况下运行协程。
本文作者:御坂19327号
本文链接:
版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!