这一次,让Kotlin Flow 操作符真正好用起来
前言
协程系列文章:
- 一个小故事讲明白进程、线程、Kotlin 协程到底啥关系?
- 少年,你可知 Kotlin 协程最初的样子?
- 讲真,Kotlin 协程的挂起/恢复没那么神秘(故事篇)
- 讲真,Kotlin 协程的挂起/恢复没那么神秘(原理篇)
- Kotlin 协程调度切换线程是时候解开真相了
- Kotlin 协程之线程池探索之旅(与Java线程池PK)
- Kotlin 协程之取消与异常处理探索之旅(上)
- Kotlin 协程之取消与异常处理探索之旅(下)
- 来,跟我一起撸Kotlin runBlocking/launch/join/async/delay 原理&使用
- 继续来,同我一起撸Kotlin Channel 深水区
- Kotlin 协程 Select:看我如何多路复用
- Kotlin Sequence 是时候派上用场了
- Kotlin Flow啊,你将流向何方?
- Kotlin Flow 背压和线程切换竟然如此相似
- Kotlin SharedFlow&StateFlow 热流到底有多热?
- 狂飙吧,Lifecycle与协程、Flow的化学反应
- 来吧!接受Kotlin 协程–线程池的7个灵魂拷问
- 当,Kotlin Flow与Channel相逢
- 这一次,让Kotlin Flow 操作符真正好用起来
Kotlin Flow 如此受欢迎大部分归功于其丰富、简洁的操作符,巧妙使用Flow操作符可以大大简化我们的程序结构,提升可读性与可维护性。 然而,虽然好用,但有些操作符不太好理解,可惜的是网上大部分文章只是简单介绍其使用,并没有梳理各个操作符的关系以及引入的缘由,本篇将通过关键原理与使用场景串联大部分操作符,以期达到举一反三的效果。 通过本篇文章,你将了解到:
- 操作符全家福
- 单Flow操作符的原理以及使用场景
- 单Flow操作符里的多协程原理以及使用场景
- 多Flow操作符里的多协程原理以及使用场景
- Flow操作符该怎么学?
1. 操作符全家福
红色部分为使用了多协程的操作符 上图仅包含常用官方提供的操作符,其它未包含进来的操作符原理也是类似的,当然我们也可以封装自己的操作符
由图上可知,将操作符分为了三类:
- 构建操作符
- 中间操作符
- 末端操作符
2. 单Flow操作符的原理以及使用场景
最简单的Flow
fun test0() {
runBlocking {
//构造flow
val flow = flow {
//上游
emit("hello world ${Thread.currentThread()}")
}
//收集flow
flow.collect {
//下游
println("collect:$it ${Thread.currentThread()}")
}
}
}
如上包含了两种操作符:构造操作符flow与末端操作符collect。
总结来说,flow调用流程简化为:两个操作符+两个闭包+emit函数:
- collect操作符触发调用,执行了flow的闭包
- flow闭包里调用emit函数,执行了collect闭包
Flow返回集合
collect闭包里仅仅只是打印了数据,有个需求:需要将收集到的数据放在List里。 很容易就想到:
fun test00() {
runBlocking {
val result = mutableListOf<String>()
//构造flow
val flow = flow {
//上游
emit("hello world ${Thread.currentThread()}")
}
//收集flow
flow.collect {
//下游
println("collect:$it ${Thread.currentThread()}")
result.add(it)
}
}
}
如上,定义List变量,在collect的闭包里收到数据后填充到List里。 某天,我们发现这个功能挺常用,需要将它封装起来,外界只需要传入List对象即可。
public suspend fun <T, C : MutableCollection<in T>> Flow<T>.toCollection(destination: C): C {
collect { value ->
destination.add(value)
}
return destination
}
外部使用:
fun test01() {
runBlocking {
val result = mutableListOf<String>()
flow {
//上游
emit("hello world ${Thread.currentThread()}")
}.toList(result)
}
}
如此一看,简单了许多,这也是官方提供的Flow操作符。
原理很简单:
- 作为Flow的扩展函数
- 重写了Flow的collect闭包,也就是FlowCollector的emit函数
后续很多操作符都是这么个套路,比如取Flow的第一个数据:first操作符,比如取对Flow里相邻的两个值做操作:reduce操作符等等。
Flow变换操作符
有个需求:在Flow流到下游之前,对数据进行处理,处理完成后再发射出去。 可以使用transform 操作符。
fun test02() {
runBlocking {
flow {
//上游
emit("hello world ${Thread.currentThread()}")
}.transform {
emit("$it man")
}.collect {
println("$it")
}
}
}
再看看原理:
public inline fun <T, R> Flow<T>.transform(
@BuilderInference crossinline transform: suspend FlowCollector<R>.(value: T) -> Unit
): Flow<R> = flow { // Note: safe flow is used here, because collector is exposed to transform on each operation
collect { value ->
//上游的数据先经过transform处理
return@collect transform(value)
}
}
- 依然是Flow扩展函数,返回一个新的Flow对象
- 新Flow对象重写了flow闭包,该闭包里调用collect收集了原始Flow的数据
- 当数据到来后,经过transform处理,而我们自定义的transform闭包里将数据再次发射出去
- 最后新返回的flow的collect闭包被调用
上面只是使用了一个transform操作符,若是多个transform操作符,该怎么去分析呢?其实,套路是有迹可循的。 这里涉及到了一种设计模式:装饰者模式
每调用1个transform操作符就会新生成一个Flow对象,该对象装饰了它的上一个(扩展)对象,如上Flow1装饰原始Flow,Flow2装饰Flow1。
fun test02() {
runBlocking {
flow {
//上游
emit("hello world ${Thread.currentThread()}")
}.transform {
emit("$it 1")
}.transform {
emit("$it 2")
}.transform {
emit("$it 3")
}.collect {
println("$it")
}
}
}
如上,相信你很快就知道输出结果了。
你可能觉得transform还需要自己发射数据,有点麻烦,map可解君忧。
fun test03() {
runBlocking {
flow {
//上游
emit("hello world ${Thread.currentThread()}")
}.map {
"$it 1"
}.collect {
println("$it")
}
}
}
map内部封装了transform。
过滤操作符
有个需求:对上流的数据进行某种条件的筛选过滤。 有了transform的经验,我们很容易想到定义扩展函数返回新的Flow,并重写collect的闭包,在闭包里进行限制。
public inline fun <T> Flow<T>.filter(crossinline predicate: suspend (T) -> Boolean): Flow<T> = transform { value ->
//条件满足再发射
if (predicate(value)) return@transform emit(value)
}
internal inline fun <T, R> Flow<T>.unsafeTransform(
@BuilderInference crossinline transform: suspend FlowCollector<R>.(value: T) -> Unit
): Flow<R> = unsafeFlow { // Note: unsafe flow is used here, because unsafeTransform is only for internal use
collect { value ->
return@collect transform(value)
}
}
使用方式:
fun test04() {
runBlocking {
flow {
//上游
emit("hello world ${Thread.currentThread()}")
emit("fish")
}.filter {
//包含hello字符串才继续往下发送
it.contains("hello")
}.collect {
println("$it")
}
}
}
掌握了以上套路,再去理解其它类似的操作符就很简单了,都是一些简单的变种。
3. 单Flow操作符里的多协程原理以及使用场景
Flow里如何切换协程与线程
上面提到的操作符,如map、filter,相信大家也看出来了:
整个流程的过程没有涉及到其它协程,也没有涉及到其它的线程,是比较单纯也比较容易理解
有个需求:在主线程执行collect操作符,在flow闭包里执行耗时操作。 此时我们就需要flow闭包里的代码在子线程执行。 你可能一下子就说出了答案:使用flowOn操作符。
fun test05() {
runBlocking {
flow {
//上游
println("emit ${Thread.currentThread()}")
emit("hello world")
}.flowOn(Dispatchers.IO)//flowOn 之前的操作符在新协程里执行
.collect {
println("$it")
println("collect ${Thread.currentThread()}")
}
}
}
//打印结果
emit Thread[DefaultDispatcher-worker-1 @coroutine#3,5,main]
hello world
collect Thread[main @coroutine#2,5,main]
可以看出,flow闭包(上游),collect闭包(下游)分别执行在不同的协程以及不同的线程里。 flowOn原理简单来说:
构造了新的协程执行flow闭包,又因为指定了协程分发器为Dispatchers.IO,因此会在子线程里执行flow闭包 原理是基于ChannelFlow
Flow处理背压
有个需求:上游发射数据速度高于下游,如何提升发射效率? 如下:
fun test06() {
runBlocking {
val time = measureTimeMillis {
flow {
//上游
println("emit ${Thread.currentThread()}")
emit("hello world")
delay(1000)
emit("hello world2")
}.collect {
delay(2000)
println("$it")
println("collect ${Thread.currentThread()}")
}
}
println("use time:$time")
}
}
//打印
emit Thread[main @coroutine#2,5,main]
hello world
collect Thread[main @coroutine#2,5,main]
hello world2
collect Thread[main @coroutine#2,5,main]
use time:5024
使用buffer操作符解决背压问题:
fun test06() {
runBlocking {
val time = measureTimeMillis {
flow {
//上游
println("emit ${Thread.currentThread()}")
emit("hello world")
delay(1000)
emit("hello world2")
}.buffer().collect {
delay(2000)
println("$it")
println("collect ${Thread.currentThread()}")
}
}
println("use time:$time")
}
}
//打印结果
emit Thread[main @coroutine#3,5,main]
hello world
collect Thread[main @coroutine#2,5,main]
hello world2
collect Thread[main @coroutine#2,5,main]
use time:4065
可以看出,总耗时减少了。 buffer原理简单来说:
构造了新的协程执行flow闭包,上游数据会发送到Channel 缓冲区里,发送完成继续发送下一条 collect操作符监听缓冲区是否有数据,若有则收集成功 原理是基于ChannelFlow
关于flowOn和buffer更详细的原理请移步:Kotlin Flow 背压和线程切换竟然如此相似
上游覆盖旧数据
有个需求:上游生产速度很快,下游消费速度慢,我们只关心最新数据,旧的数据没价值可以丢掉。 使用conflate操作符处理:
fun test07() {
runBlocking {
flow {
//上游
repeat(5) {
emit("emit $it")
delay(100)
}
}.conflate().collect {
delay(500)
println("$it")
}
}
}
//打印结果:
emit 0
emit 4
可以看出,中间产生的数据由于下游没有来得及消费,被上游新的数据冲刷掉了。
conflate原理简单来说:
相当于使用了buffer操作符,该buffer只能容纳一个数据,新来的数据将会覆盖旧的数据 原理是基于ChannelFlow
Flow变换取最新值
有个需求:在使用transform处理数据的时候,若是它处理比较慢,当有新的值过来后就取消未处理好的值。 使用transformLatest操作符处理:
fun test08() {
runBlocking {
flow {
//上游,协程1
repeat(5) {
emit("emit $it")
}
println("emit ${Thread.currentThread()}")
}.transformLatest {
//协程2
delay(200)
emit("$it fish")
}.collect {
println("collect ${Thread.currentThread()}")
println("$it")
}
}
}
打印结果:
emit Thread[main @coroutine#3,5,main]
collect Thread[main @coroutine#2,5,main]
emit 4 fish
可以看出,由于transform处理速度比较慢,上游有新的数据过来后会取消transform里未处理的数据。 查看源码是如何处理的:
override suspend fun flowCollect(collector: FlowCollector<R>) {
coroutineScope {
var previousFlow: Job? = null
//开始收集上游数据
flow.collect { value ->
previousFlow?.apply {
//若是之前的协程还在,则取消
cancel(ChildCancelledException())
join()
}
//开启协程执行,此处选择不分发新线程
previousFlow = launch(start = CoroutineStart.UNDISPATCHED) {
collector.transform(value)
}
}
}
}
transformLatest原理简单来说:
构造新的协程1执行flow闭包,收集到数据后再开启新的协程2,在协程里会调用transformLatest的闭包,最终调用collect的闭包 协程1继续发送数据,若是发现协程2还在运行,则取消协程2 原理是基于ChannelFlow
同理,map也有类似的操作符:
fun test09() {
runBlocking {
flow {
//上游
repeat(5) {
emit("emit $it")
}
println("emit ${Thread.currentThread()}")
}.mapLatest {
delay(200)
"$it fish"
}.collect {
println("collect ${Thread.currentThread()}")
println("$it")
}
}
}
//打印结果
emit Thread[main @coroutine#3,5,main]
collect Thread[main @coroutine#2,5,main]
emit 4 fish
收集最新的数据
有个需求:监听下载进度,UI展示最新进度。 分析:此种场景下,我们只是关注最新的进度,没必要频繁刷新UI,因此使用Flow实现时上游发射太快了可以忽略旧的数据。 使用collectLatest操作符实现:
fun test014() {
runBlocking {
val time = measureTimeMillis {
val flow1 = flow {
repeat(100) {
emit(it + 1)
}
}
flow1.collectLatest {
delay(20)
println("collect progress $it")
}
}
println("use time:$time")
}
}
//打印结果
collect progress 100
use time:169
collectLatest原理简单来说:
开启新协程执行flow闭包 若是collect收集比较慢,下一个数据emit过来后会取消未处理的数据 原理是基于ChannelFlow
4. 多Flow操作符里的多协程原理以及使用场景
很多时候我们不止操作单个Flow,有可能需要结合多个Flow来实现特定的业务场景。
展平流
flatMapConcat
有个需求:请求某个学生的班主任信息,这里涉及到两个接口:
- 请求学生信息,使用Flow1表示
- 请求该学生的班主任信息,使用Flow2表示
- 我们需要先拿到学生的信息,通过信息里带的班主任id去请求班主任信息
分析需求可知:获取学生信息的请求和获取班主任信息的请求是串行的,有前后依赖关系。 使用flatMapConcat操作符实现:
fun test010() {
runBlocking {
val flow1 = flow {
emit("stuInfo")
}
flow1.flatMapConcat {
//flow2
flow {
emit("$it teachInfo")
}
}.collect {
println("collect $it")
}
}
}
//打印结果:
collect stuInfo teachInfo
从打印结果可以看出:
所谓展平,实际上就是将两个Flow的数据拍平了输出
当然,你也可以请求多个学生的班主任信息:
fun test011() {
runBlocking {
val time = measureTimeMillis {
val flow1 = flow {
println("emit ${Thread.currentThread()}")
emit("stuInfo 1")
emit("stuInfo 2")
emit("stuInfo 3")
}
flow1.flatMapConcat {
//flow2
flow {
println("flatMapConcat ${Thread.currentThread()}")
emit("$it teachInfo")
delay(1000)
}
}.collect {
println("collect ${Thread.currentThread()}")
println("collect $it")
}
}
println("use time:$time")
}
}
//打印结果:
emit Thread[main @coroutine#2,5,main]
flatMapConcat Thread[main @coroutine#2,5,main]
collect Thread[main @coroutine#2,5,main]
collect stuInfo 1 teachInfo
flatMapConcat Thread[main @coroutine#2,5,main]
collect Thread[main @coroutine#2,5,main]
collect stuInfo 2 teachInfo
flatMapConcat Thread[main @coroutine#2,5,main]
collect Thread[main @coroutine#2,5,main]
collect stuInfo 3 teachInfo
use time:3032
flatMapConcat原理简单来说:
flatMapConcat 并没有涉及到多协程,使用了装饰者模式 先将Flow2使用map进行变换,而后将Flow1、Flow2数据发射出来 Concat顾名思义,将两个Flow连接起来
flatMapMerge
有个需求:在flatMapConcat里,先查询了学生1的班主任信息后才会查询学生2的班主任信息,依照此顺序进行查询。现在需要提升效率,同时查询多个多个学生的班主任信息。 使用flatMapMerge操作符实现:
fun test012() {
runBlocking {
val time = measureTimeMillis {
val flow1 = flow {
println("emit ${Thread.currentThread()}")
emit("stuInfo 1")
emit("stuInfo 2")
emit("stuInfo 3")
}
flow1.flatMapMerge(4) {
//flow2
flow {
println("flatMapMerge ${Thread.currentThread()}")
emit("$it teachInfo")
delay(1000)
}
}.collect {
println("collect ${Thread.currentThread()}")
println("collect $it")
}
}
println("use time:$time")
}
}
//打印结果:
flatMapMerge Thread[main @coroutine#6,5,main]
collect Thread[main @coroutine#2,5,main]
collect stuInfo 1 teachInfo
collect Thread[main @coroutine#2,5,main]
collect stuInfo 2 teachInfo
collect Thread[main @coroutine#2,5,main]
collect stuInfo 3 teachInfo
use time:1086
可以看出,flatMapMerge由于是并发执行,整体速度比flatMapConcat快了很多。 flatMapMerge可以指定并发的数量,当指定flatMapMerge(0)时,flatMapMerge退化为flatMapConcat。 关键源码如下:
override suspend fun collectTo(scope: ProducerScope<T>) {
val semaphore = Semaphore(concurrency)
val collector = SendingCollector(scope)
val job: Job? = coroutineContext[Job]
flow.collect { inner ->
job?.ensureActive()
//并发数限制锁
semaphore.acquire()
scope.launch {
//开启新的协程
try {
//执行flatMapMerge闭包里的flow
inner.collect(collector)
} finally {
semaphore.release() // Release concurrency permit
}
}
}
}
flatMapMerge原理简单来说:
flow1里的每个学生信息会触发去获取班主任信息flow2 新开了协程去执行flow2的闭包 原理是基于ChannelFlow
flatMapLatest
有个需求:flatMapConcat 是线性执行的,可以使用flatMapMerge提升效率。为了节约资源,在请求班主任信息的时候,若是某个学生的班主任信息没有返回,而下一个学生的班主任信息已经开始请求,则取消上一个没有返回的班主任Flow。 使用flatMapLatest操作符实现:
fun test013() {
runBlocking {
val time = measureTimeMillis {
val flow1 = flow {
// println("emit ${Thread.currentThread()}")
emit("stuInfo 1")
emit("stuInfo 2")
emit("stuInfo 3")
}
flow1.flatMapLatest {
//flow2
flow {
// println("flatMapLatest ${Thread.currentThread()}")
delay(1000)
emit("$it teachInfo")
}
}.collect {
// println("collect ${Thread.currentThread()}")
println("collect $it")
}
}
println("use time:$time")
}
}
//打印结果:
collect stuInfo 3 teachInfo
use time:1105
可以看出,只有学生3的班主任信息打印出来了,并且整体时间都减少了。 flatMapLatest原理简单来说:
和transformLatest很相似 原理是基于ChannelFlow
简单总结一下关于收集最新数据的操作符:
transformLatest、mapLatest、collectLatest、flatMapLatest 四者的核心实现都是ChannelFlowTransformLatest,而它最终继承自:ChannelFlow
组合流
combine
有个需求:查询学生的性别以及选修了某个课程。 分析:涉及到两个需求,查询学生性别与查询选修课程,输出结果是:性别:xx,选修了:xx课程。这俩请求可以同时发出,并没有先后顺序,因此我们没必要使用flatMapXX系列操作符。 使用combine操作符:
fun test015() {
runBlocking {
val time = measureTimeMillis {
val flow1 = flow {
emit("stuSex 1")
emit("stuSex 2")
emit("stuSex 3")
}
val flow2 = flow {
emit("stuSubject")
}
flow1.combine(flow2) {
sex, subject->"$sex-->$subject"
}.collect {
println(it)
}
}
println("use time:$time")
}
}
//打印结果:
stuSex 1-->stuSubject
stuSex 2-->stuSubject
stuSex 3-->stuSubject
use time:46
可以看出,flow1的每个emit和flow2的emit关联起来了。 combine操作符有个特点:
短的一方会等待长的一方结束后才结束
看个例子就比较清晰:
fun test016() {
runBlocking {
val time = measureTimeMillis {
val flow1 = flow {
emit("a")
emit("b")
emit("c")
emit("d")
}
val flow2 = flow {
emit("1")
emit("2")
}
flow1.combine(flow2) {
sex, subject->"$sex-->$subject"
}.collect {
println(it)
}
}
println("use time:$time")
}
}
//打印结果
a-->1
b-->2
c-->2
d-->2
use time:45
flow2早就发射到”2”了,会一直等到flow1发射结束。
combine原理简单来说:
zip
在combine需求的基础上,我们又有个优化:无论是学生性别还是学生课程,只要某个Flow获取结束了就取消Flow。 使用zip操作符:
fun test017() {
runBlocking {
val time = measureTimeMillis {
val flow1 = flow {
emit("a")
emit("b")
emit("c")
emit("d")
}
val flow2 = flow {
emit("1")
emit("2")
}
flow1.zip(flow2) {
sex, subject->"$sex-->$subject"
}.collect {
println(it)
}
}
println("use time:$time")
}
}
//打印结果
a-->1
b-->2
use time:71
可以看出flow2先结束了,并且flow1没发送完成。 zip原理简单来说:
可以看出,zip的特点:
短的Flow结束,另一个Flow也结束
5. Flow操作符该怎么学?
以上我们由浅入深分别分析了:
- 单个Flow操作符原理与使用场景
- 单个Flow操作符切换多个协程的原理与使用场景
- 多个Flow操作符切换多个协程的原理与使用场景
以上三者是递进关系,第1点比较简单,第2点难度适中。 尤其是第3点比较难以理解,因为涉及到了其它的知识:Channel、ChannelFlow、多协程、线程切换等。 在之前的文章中有提到过:ChannelFlow是Flow复杂操作符的基础,想要掌握复杂操作符的原理需要明白ChannelFlow的运行机制,有兴趣可移步:当,Kotlin Flow与Channel相逢
建议Flow操作符学习步骤:
- 先会使用简单的操作符filter、map等
- 再学会使用flowOn、buffer、callbackFlow等操作符
- 进而使用flatMapXXX以及combine、zip等操作符
- 最后可以看看其实现原理,达到举一反三应用到实际需求里
Flow操作符的闭坑指南:
- 涉及到多协程的操作符,需要关注其执行的线程环境
- 涉及到多协程的操作符,需要关注协程的生命周期
说实话,Flow操作符要掌握好挺难的,它几乎涉及了协程所有的知识点,也是协程实际应用的精华。这篇是我在协程系列里花费时间最长的文章了(也许也是最后一篇了),即使自己弄明白了,怎样把它很自然地递进引出也是个有挑战的事。 若你能够在本篇的分析中得到一点启发,那说明我的分享是有价值的。 由于篇幅关系,一些操作符debounce、sample等并没有分析,也没有再贴flatMapXXX的源码细节(这部分之前的文章都有分析过),若你有需要可以给我留言评论。
本文基于Kotlin 1.6.1,覆盖所有Flow操作符的demo
您若喜欢,请点赞、关注、收藏,您的鼓励是我前进的动力
持续更新中,和我一起步步为营系统、深入学习Android/Kotlin
- Android各种Context的前世今生
- Android DecorView 必知必会
- Window/WindowManager 不可不知之事
- View Measure/Layout/Draw 真明白了
- Android事件分发全套服务
- Android invalidate/postInvalidate/requestLayout 彻底厘清
- Android Window 如何确定大小/onMeasure()多次执行原因
- Android事件驱动Handler-Message-Looper解析
- Android 键盘一招搞定
- Android 各种坐标彻底明了
- Android Activity/Window/View 的background
- Android Activity创建到View的显示过
- Android IPC 系列
- Android 存储系列
- Java 并发系列不再疑惑
- Java 线程池系列
- Android Jetpack 前置基础系列
- Android Jetpack 易学易懂系列
- Kotlin 轻松入门系列
- Kotlin 协程系列全面解读
留下评论