当前位置:
首页
文章
后端
详情

kotlin<第十篇>:Flow-异步流

Flow: 是一种类似于序列的冷流,flow构建器中的代码直到流被收集的时候才运行。
流的连续性:流的每次单独收集都是按顺序执行的,除非使用特殊操作符。
从上游到下游每个过渡操作符都会处理每个发射出的值,然后再交给末端操作符。

flow构建器创建一个函数
返回多个值,而且是异步的,不是一次性返回

(1)构建流的三种方式

// flow构建器创建一个函数
// 返回多个值,而且是异步的,不是一次性返回
suspend fun simpleFlow() = flow&lt;Int&gt; {
    for (i in 1..3) {
        delay(1000)
        emit(i) // 发射,产生一个元素
    }
}
runBlocking {
    // Flow构建方式1
    simpleFlow().collect { value -&gt; println(value) } // 收集元素

    // Flow构建方式2
    (1..5).asFlow().filter {
        it % 2 == 0
    }.map {
        println("Map $it")
    }.onEach {
        delay(1000)
    }.collect {
        println("Collect $it")
    }

    // Flow构建方式3
    flowOf("one", "two", "three").onEach { delay(1000) }.collect { values -&gt;
        println(values)
    }
}

(2)流的上下文

    // Flow上下文验证
    (1..5).asFlow().filter {
        println("当前线程-filter:" + Thread.currentThread().name)
        it % 2 == 0
    }.map {
        println("当前线程-map:" + Thread.currentThread().name)
    }.onEach {
        delay(1000)
    }.collect {
        println("当前线程-collect:" + Thread.currentThread().name)
        println("Collect $it")
    }

从打印结果上看,上游和下游都是在主线程。
但是,一般情况下,Flow构建之后的代码块中是耗时操作,所以不能放在主线程,解决方案是:在Flow构建器后面添加 flowOn(Dispatchers.Default),改造后的代码如下:

suspend fun simpleFlow() = flow&lt;Int&gt; {
    for (i in 1..3) {
        delay(1000)
        emit(i) // 发射,产生一个元素
    }
}.flowOn(Dispatchers.Default)

fun main() {
    runBlocking {
        // Flow构建方式1
        simpleFlow().collect { value -&gt; println(value) } // 收集元素

        // Flow构建方式2
        (1..5).asFlow().filter {
            println("当前线程-filter:" + Thread.currentThread().name)
            it % 2 == 0
        }.map {
            println("当前线程-map:" + Thread.currentThread().name)
        }.onEach {
            delay(1000)
        }.flowOn(Dispatchers.Default).collect {
            println("当前线程-collect:" + Thread.currentThread().name)
            println("Collect $it")
        }

        // Flow构建方式3
        flowOf("one", "two", "three").flowOn(Dispatchers.Default).onEach { delay(1000) }.collect { values -&gt;
            println(values)
        }

    }
}

(3)启动流

启动流:launchIn传入协程作用域形参,使用launchIn替换collect我们可以在指定协程中启动流的收集

    (1..5).asFlow().onEach {
        delay(1000)
    }.flowOn(Dispatchers.Default).launchIn(CoroutineScope(Dispatchers.IO)).join()


    (1..5).asFlow().onEach {
        delay(1000)
    }.flowOn(Dispatchers.Default).launchIn(this).join()

(4)流的取消

使用 withTimeoutOrNull 方式取消:

suspend fun simpleFlow() = flow&lt;Int&gt; {
    for (i in 1..3) {
        delay(1000)
        emit(i) // 发射,产生一个元素
    }
}.flowOn(Dispatchers.Default)

fun main() {
    runBlocking {

        withTimeoutOrNull(2000) {
            // Flow构建方式1
            simpleFlow().collect { value -&gt; println(value) } // 收集元素
        }

        withTimeoutOrNull(2000) {
            (1..5).asFlow().onEach {
                delay(1000)
            }.flowOn(Dispatchers.Default).collect {
                println("Collect $it")
            }
        }

        withTimeoutOrNull(2000) {
            flowOf("one", "two", "three").flowOn(Dispatchers.Default).onEach { delay(1000) }.collect { values -&gt;
                println(values)
            }
        }

        withTimeoutOrNull(2000) {
            (1..5).asFlow().onEach {
                delay(1000)
            }.flowOn(Dispatchers.Default).launchIn(CoroutineScope(Dispatchers.IO)).join()
        }

        println("Done...")

    }
}

另外,启动流还可以调用 cancelAndJoin 取消。

    val job = (1..5).asFlow().onEach {
        delay(1000)
    }.flowOn(Dispatchers.Default).launchIn(CoroutineScope(Dispatchers.IO))
    delay(1000)
    job.cancelAndJoin()

(5)流的取消检测

为方便起见,流构建器对每个发射值执行附加的ensureActive 检测以进行取消,这意味着从 flow{...} 发出的繁忙循环是可以取消的。
出于性能原因,大多数其他流操作不会自行执行其他取消检测,在协程处于繁忙循环的情况下,必须明确检测是否取消。
通过cancellable操作符来执行此操作。

suspend fun simpleFlow() = flow&lt;Int&gt; {
    for (i in 1..5) {
        delay(1000)
        emit(i) // emit自带检测是否取消的能力
    }
}.flowOn(Dispatchers.Default)

fun main() {
    runBlocking {

        // emit 自带检测是否取消的能力
        simpleFlow().collect { value -&gt;
            if (value == 3) cancel()
        }

        // 如果没有emit,需要使用 cancellable
        (1..5).asFlow().cancellable().onEach {
            delay(1000)
        }.flowOn(Dispatchers.Default).collect { value -&gt;
            if (value == 3) cancel()
        }

    }
}

(6)背压

背压:水流受到与流动方向一致的压力。
生产者、消费者模式,只要生产效率 > 消费效率,那么就会产生背压。

处理背压的方式有:

  • buffer(),并发运行流中发射元素的代码
  • conflate(),合并发射项,不对每个值进行处理
  • collectLatest(),取消并重新发射最后一个值
  • 当必须更改CoroutineDispatcher时,flowOn操作符使用了相同的缓冲机制,但是buffer函数显示地请求缓冲而不改变执行上下文
suspend fun simpleFlow() = flow&lt;Int&gt; {
    for (i in 1..50) {
        println("发送数据:$i")
        delay(100)
        emit(i)
    }
}

fun main() {
    runBlocking {
        val time = measureTimeMillis {
            simpleFlow()
                .collect { value -&gt;
                delay(300)
                println("接收数据:$value")
            }
        }
        println("耗时:$time")
    }
}

以上代码,发送数据和接收数据都是在同一个线程中并行执行,如果存在耗时程序,将特别影响效率。

为了增加执行效率,可以使用 buffer 设置缓存大小,从而起到加快执行速率的效果。

    val time = measureTimeMillis {
        // 背压
        simpleFlow()
            .buffer(10)
            .collect { value -&gt;
            delay(300)
            println("接收数据:$value")
        }
    }

但是,从生产者/消费者的设计思想的角度上考虑,发送数据最好放在子线程。

    val time = measureTimeMillis {
        // 背压
        simpleFlow()
            .flowOn(Dispatchers.Default)
            .collect { value -&gt;
            delay(300)
            println("接收数据:$value")
        }
    }

使用 flowOn 可以指定 Flow 的协程作用域,这样可以将 并行 转成 并发,从而加快执行效率。

runBlocking {
    val time = measureTimeMillis {
        // 背压
        simpleFlow()
            .conflate()
            .collect { value -&gt;
            delay(300)
            println("接收数据==:$value")
        }
    }
    println("耗时:$time")
}

以上代码使用 conflate,中间一些元素不会处理,从而加快执行效率。

    val time = measureTimeMillis {
        // 背压
        simpleFlow()
            .collectLatest { value -&gt;
            delay(300)
            println("接收数据==:$value")
        }

以上代码将 collect 改成 collectLatest 之后,只会处理最后一个值,从而加速执行速度。

(7)转换操作符

使用map转换:

suspend fun simpleFlow() = flow&lt;Int&gt; {
    for (i in 1..3) {
        println(i)
        emit(i)
    }
}

fun main() {
    runBlocking {
        simpleFlow()
            .map { value -&gt;
                "response $value"
            }
            .collect { value -&gt;
                println(value)
            }
    }
}

使用transform转换,可以转换成任意次、任意值的Flow:

suspend fun simpleFlow() = flow&lt;Int&gt; {
    for (i in 1..3) {
        println(i)
        emit(i)
    }
}

fun main() {
    runBlocking {
        simpleFlow()
            .transform { request -&gt;
                emit("request $request")
                emit("request $request")
            }
            .collect { value -&gt;
                println(value)
            }
    }
}

(8)限长操作符

take 是限长操作符,可以限制处理的数量:

suspend fun simpleFlow() = flow&lt;Int&gt; {
    for (i in 1..3) {
        println(i)
        emit(i)
    }
}

fun main() {
    runBlocking {
        simpleFlow()
            .take(2)
            .collect { value -&gt;
                println(value)
            }
    }
}

(9)末端操作符

末端操作符是在流上用于 启动流收集的挂起函数。collect是最基础的末端操作符,但是还有另外一些更加方便使用的末端操作符:

  • 转化为各种集合,例如:toList与toSet。
  • 获取第一个(first)值与确保流发射单个(single)值的操作符。
  • 使用reduce与fold将流规约到单个值。
fun main() {
    runBlocking {
        val sum = simpleFlow()
            .reduce { a, b -&gt;
                a + b
            }
        println(sum)
    }
}

reduce 操作符可以将元素累加。
reduce的返回值类型必须和集合的元素类型相符。

suspend fun simpleFlow() = flow&lt;Int&gt; {
    for (i in 1..3) {
        emit(i)
    }
}

fun main() {
    runBlocking {
        val newStr = simpleFlow()
            .fold(StringBuilder()) { str: StringBuilder, a: Int -&gt;
                str.append(a).append(" ")
            }
        println(newStr)
    }
}

而fold的返回值类型则不受约束。

(10)组合操作符

zip 操作符将两个流合并。

runBlocking {
    val nums1 = (1..3).asFlow()
    val nums2 = flowOf("one", "two", "three")
    nums1.zip(nums2) {a, b -&gt;
        "$a $b"
    }.collect {value-&gt;
        println(value)
    }
}

(11)展平操作符

流表示异步接收的值序列,所以很容易遇到这种情况:每个值都会触发对另一个值序列的请求,然而,由于流具有异步的性质,因此需要不同的展平模式,为此,存在一系列的流展平操作符:

  • flatMapConcat:连接模式
  • flatMapMerge:合并模式
  • flatMapLatest: 最新展平模式
suspend fun requestFlow(i: Int) = flow&lt;String&gt; {
    emit("request $i first")
    delay(500)
    emit("request $i second")
}

fun main() {
    runBlocking {
        val startTime = System.currentTimeMillis()
        (1..3).asFlow()
            .onEach { delay(100) }
            .flatMapConcat {
                requestFlow(it) // Flow的元素是Flow
            }
            .collect { value-&gt;
            println("$value -- ${System.currentTimeMillis() - startTime}")
        }
    }
}

代码中 flatMapConcat 可以换成 flatMapMerge 或者 flatMapLatest

三者的执行结果是:

flatMapConcat :(requestFlow全部执行完)

request 1 first -- 198
request 1 second -- 701
request 2 first -- 815
request 2 second -- 1319
request 3 first -- 1428
request 3 second -- 1932

flatMapMerge:(不需要等待requestFlow全部执行完)

request 1 first -- 281
request 2 first -- 361
request 3 first -- 470
request 1 second -- 798
request 2 second -- 876
request 3 second -- 985

flatMapLatest:

request 1 first -- 250
request 2 first -- 376
request 3 first -- 485
request 3 second -- 1001

(12)流的异常处理

suspend fun requestFlow() = flow&lt;Int&gt; {
    for (i in 1..3) {
        emit(i)
        throw RuntimeException("exception")
    }
}.catch {e: Throwable -&gt;
    println("上游异常捕获:" + e.message)
}

fun main() {
    runBlocking {
        try {
            requestFlow()
                .collect { value-&gt;
                    check(value &lt; 2) // 检查异常
                    println(value)
                }
        } catch (e: Throwable) {
            println("下游异常捕获:" + e.message)
        }
    }
}

check:检查异常,一旦检查到异常,程序crash。
下游通过 try...catch 捕获异常,上游Flow自带 catch 函数。

(13)流的完成

收集完成时,使用 finally,表示收集完成。

suspend fun requestFlow() = flow&lt;Int&gt; {
    for (i in 1..3) {
        emit(i)
    }
}

fun main() {
    runBlocking {
        try {
            requestFlow().collect { value-&gt; println(value) }
        } finally {
            println("...完成...")
        }
    }
}

使用 onCompletion 也可以表示完成:

suspend fun requestFlow() = flow&lt;Int&gt; {
    for (i in 1..3) {
        emit(i)
        throw RuntimeException("exception")
    }
}.catch {exception-&gt;
    println("catch -&gt; exception:" + exception.message)
}

fun main() {
    runBlocking {

        requestFlow()
            .onCompletion {exception -&gt;
                if (exception != null) { // 异常导致完成
                    println("finish -&gt; exception:" + exception.message)
                } else { // 正常结束
                    println("正常结束")
                }
            }
            .collect { value-&gt; println(value) }

    }
}

onCompletion 可以拿到异常信息,但是不能捕获异常。

(13)Flow实现多路复用

多数情况下,我们可以通过构造合适的Flow来实现多路复用的效果。

data class User(val name: String)
data class Response&lt;T&gt;(val value: T, val isLocal: Boolean)
suspend fun CoroutineScope.getUserForLocal(name: String) = async {
    delay(1000)
    User(name)
}

suspend fun CoroutineScope.getUserFromRemote(name: String) = async {
    delay(100)
    User(name)
}

fun main() {
    runBlocking {
        val name = "guest"
        // 两个函数
        listOf(::getUserForLocal, ::getUserFromRemote)
            .map { function-&gt;
                function.call(name)
            }
            .map { deferred -&gt;
                flow { emit(deferred.await()) }
            }.merge().collect { user -&gt; println(user) }

    }
}

以上代码用到了反射,需要引入依赖:

implementation 'org.jetbrains.kotlin:kotlin-reflect:1.0.6'

[完...]

免责申明:本站发布的内容(图片、视频和文字)以转载和分享为主,文章观点不代表本站立场,如涉及侵权请联系站长邮箱:xbc-online@qq.com进行反馈,一经查实,将立刻删除涉嫌侵权内容。

同类热门文章

深入了解C++中的new操作符:使用具体实例学习

C++中的new操作符是动态分配内存的主要手段之一。在程序运行时,我们可能需要动态地创建和销毁对象,而new就是为此提供了便利。但是,使用new也常常会引发一些问题,如内存泄漏、空指针等等。因此,本文将通过具体的示例,深入介绍C++中的new操作符,帮助读者更好地掌握其使用。


深入了解C++中的new操作符:使用具体实例学习

怎么用Java反射获取包下所有类? 详细代码实例操作

Java的反射机制就是在运行状态下,对于任何一个类,它能知道这个类的所有属性和方法;对于任何一个对象,都能调用这个对象的任意一个方法。本篇文章将通过具体的代码示例,展示如何通过Java反射来获取包下的所有类。


怎么用Java反射获取包下所有类? 详细代码实例操作

员工线上学习考试系统

有点播,直播,在线支付,三级分销等功能,可以对学员学习情况的监督监控,有源码,可二次开发。支持外网和局域网私有化部署,经过测试源码完整可用!1、视频点播:视频播放,图文资料,课件下载,章节试学,限时免

员工线上学习考试系统

了解Java中的volati关键字的作用 以及具体使用方法

本篇文章将和大家分享一下Java当中的volatile关键字,下面将为各位小伙伴讲述volatile关键字的作用以及它的具体使用方法。


了解Java中的volati关键字的作用 以及具体使用方法

Java Map 所有的值转为String类型

可以使用 Java 8 中的 Map.replaceAll() 方法将所有的值转为 String 类型: 上面的代码会将 map 中所有的值都转为 String 类型。 HashMap 是 Java

Java Map 所有的值转为String类型