【Kotlin】Flow简介

2024-07-04 1467阅读

1 前言

        Flow 是 Kotlin 标准库中的一个新的异步流处理框架,旨在简化异步数据流的操作和处理,它提供了一种声明式的方式来处理数据流。

【Kotlin】Flow简介
(图片来源网络,侵删)

        Flow 中一些接口调用有些类似 Sequence(详见 → Sequence简介),协程的使用详见 → 协程。

        Flow 有以下特性和概念。

  1. 异步流(Asynchronous Streams):Flow 允许以一种非阻塞的方式处理一系列的值或事件,这使得在处理大量数据或涉及 IO 操作时能够更加高效。

  2. 冷流(Cold Flow):只有在收集器(collector)订阅(或启动)了之后才会开始发射(emit)数据。

  3. 热流(Hot Flow):在创建后就立即开始发射(emit)数据,不管是否有收集器(collector),这会导致收集器可能只接收到部分数据。

  4. 声明式 API:Flow 提供了一套简洁清晰的操作符,允许以声明式的方式对流进行处理,如 map、filter、reduce 等。

  5. 协程集成:Flow 构建在协程之上,因此可以与协程一起使用,并且可以利用协程的优势,比如轻量级、顺序性等。

  6. 取消支持:Flow 支持与协程一样的取消操作,从而释放资源和避免内存泄漏。

  7. 背压支持:Flow 提供了背压支持,可以通过各种操作符来控制数据的生产和消费速率,防止生产者速度过快导致消费者无法跟上。

        Flow 有中间操作和终端操作,如下。

  • 中间操作:每次操作返回一个新的 Flow 对象(主要操作有:flowOn、catch、buffer、conflate、filter、distinctUntilChanged、drop、take、map 等)。
  • 终端操作:每次操作返回一个值或集合,每个 Flow 只能进行一次终端操作(主要操作有:first、last、count、fold、reduce、collect、toCollection、toSet、toList 等)。

    2 创建 Flow

    2.1 emptyFlow

    public fun  emptyFlow(): Flow = EmptyFlow

    2.2 flow

            1)源码

    public fun  flow(block: suspend FlowCollector.() -> Unit): Flow = SafeFlow(block)

            2)应用

    var coldFlow = flow {  
        emit("A")
        emit("B")
    }

    2.3 MutableSharedFlow

            1)源码

    public fun  MutableSharedFlow(
        replay: Int = 0,
        extraBufferCapacity: Int = 0,
        onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
    ): MutableSharedFlow

            2)应用

    var hotFlow = MutableSharedFlow()
    CoroutineScope(Dispatchers.Default).launch {
        hotFlow.emit("A")
        hotFlow.emit("B")
    }

    2.4 flowOf

            1)源码

    public fun  flowOf(value: T): Flow = flow {
        emit(value)
    }
    public fun  flowOf(vararg elements: T): Flow = flow {
        for (element in elements) {
            emit(element)
        }
    }

            2)应用

    var flow1 = flowOf("A")
    var flow2 = flowOf("A", "B", "C")

    2.5 asFlow

    2.5.1 () -> T

            1)源码

    public fun  (() -> T).asFlow(): Flow = flow {
        emit(invoke())
    }

            2)应用

    fun main() {
        var fun2 = { fun1() }.asFlow()
    }
    fun fun1(): String {
        return "xxx"
    }

    2.5.2 Iterator

            1)源码

    public fun  Iterator.asFlow(): Flow = flow {
        forEach { value ->
            emit(value)
        }
    }

            2)应用

    var array = intArrayOf(1, 2, 3)
    var iterator = array.iterator()
    var flow = iterator.asFlow()

    2.5.3 Sequence

            1)源码

    public fun  Sequence.asFlow(): Flow = flow {
        forEach { value ->
            emit(value)
        }
    }

            2)应用

    var sequence = sequenceOf(1, 2, 3)
    var flow = sequence.asFlow()

    2.5.4 Array

            1)源码

    public fun  Array.asFlow(): Flow = flow {
        forEach { value ->
            emit(value)
        }
    }

            2)应用

    var array = arrayOf(1, 2, 3)
    var flow = array.asFlow()

    2.5.5 Range

            1)源码

    public fun IntRange.asFlow(): Flow = flow {
        forEach { value ->
            emit(value)
        }
    }

            2)应用

    var range = 1..3
    var flow = range.asFlow()

    2.6 zip

            1)源码

    public fun  Flow.zip(other: Flow, transform: suspend (T1, T2) -> R): Flow

            2)应用

    var flow1 = flowOf(1, 3, 5)
    var flow2 = flowOf("A", "B", "C")
    // A-1, B-3, C-5
    var flow = flow1.zip(flow2) { num, str ->
        "$str-$num"
    }

    3 Flow 冷流和热流

    3.1 冷流

    import kotlinx.coroutines.CoroutineScope
    import kotlinx.coroutines.Dispatchers
    import kotlinx.coroutines.MainScope
    import kotlinx.coroutines.delay
    import kotlinx.coroutines.flow.Flow
    import kotlinx.coroutines.flow.flow
    import kotlinx.coroutines.launch
    fun main() {
        val coldFlow = emitFlow()
        CoroutineScope(Dispatchers.Default).launch {
            coldFlow.collect {
                println("CoroutineScope, $it")
            }
        }
        MainScope().launch(Dispatchers.IO) {
            coldFlow.collect {
                println("MainScope, $it")
            }
        }
        Thread.sleep(1000)
    }
    fun emitFlow(): Flow = flow {
        for (i in 1..2) {
            emit("emit-$i")
            delay(100)
        }
    }

            打印如下。

    CoroutineScope, emit-1
    MainScope, emit-1
    CoroutineScope, emit-2
    MainScope, emit-2

            说明:可以看到每一个订阅者都可以收到所有消息。

    3.2 热流

    import kotlinx.coroutines.CoroutineScope
    import kotlinx.coroutines.Dispatchers
    import kotlinx.coroutines.MainScope
    import kotlinx.coroutines.delay
    import kotlinx.coroutines.flow.MutableSharedFlow
    import kotlinx.coroutines.launch
    fun main() {
        var hotFlow = emitFlow()
        CoroutineScope(Dispatchers.Default).launch {
            hotFlow.collect {
                println("CoroutineScope, $it")
            }
        }
        MainScope().launch(Dispatchers.IO) {
            hotFlow.collect {
                println("MainScope, $it")
            }
        }
        Thread.sleep(1000)
    }
    fun emitFlow(): MutableSharedFlow {
        var hotFlow = MutableSharedFlow()
        CoroutineScope(Dispatchers.Default).launch {
            for (i in 1..2) {
                hotFlow.emit("emit-$i")
                delay(100)
            }
        }
        return hotFlow
    }

            打印如下。

    MainScope, emit-2
    CoroutineScope, emit-2

            说明:可以看到每一个订阅者都只收到部分消息。

    4 Flow 的中间操作

    4.1 源码

    // 切换线程
    public fun  Flow.flowOn(context: CoroutineContext): Flow
    // 捕获异常
    public fun  Flow.catch(action: suspend FlowCollector.(Throwable) -> Unit): Flow
    // 在数据流中使用一个缓冲区来存储数据, 当数据产生速率超过消费速率时, 数据会暂时存储在缓冲区中, 直到有足够的空间将其传递给订阅者。这可以确保数据不会丢失,但可能会占用更多的内存。
    public fun  Flow.buffer(capacity: Int = BUFFERED, onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND): Flow
    // 当数据产生速率超过消费速率时, 跳过一些数据, 只保留最新的数据。这样可以减少内存占用,但会丢失一部分数据。
    public fun  Flow.conflate(): Flow = buffer(CONFLATED)
    // 过滤
    public inline fun  Flow.filter(crossinline predicate: suspend (T) -> Boolean): Flow
    // 去除相邻的重复元素
    public fun  Flow.distinctUntilChanged(): Flow
    // 丢弃前 n 个元素
    public fun  Flow.drop(count: Int): Flow
    // 截取前 n 个元素
    public fun  Flow.take(count: Int): Flow
    // 映射(T -> R)
    public inline fun  Flow.map(crossinline transform: suspend (value: T) -> R): Flow

    4.2 应用

    fun main() {
        var flow = flowOf(4, 9, 1, 8, 5, 7, 7, 5, 3, 6, 2)
        CoroutineScope(Dispatchers.Default).launch {
            flow.flowOn(Dispatchers.Default)
                .catch {
                    println(it.message)
                }.buffer()
                .filter { it in 3..7 } // 4, 5, 7, 7, 5, 3, 6
                .distinctUntilChanged() // 4, 5, 7, 5, 3, 6
                .drop(1) // 5, 7, 5, 3, 6
                .take(4) // 5, 7, 5, 3
                .map { it * it } // 25, 49, 25, 9
                .collect(::println)
        }
        Thread.sleep(1000)
    }

    5 Flow 的终端操作

    5.1 first、last、count

            1)源码

    // 首元素
    public suspend fun  Flow.first(): T
    // 尾元素
    public suspend fun  Flow.last(): T

            2)应用

    fun main() {
        var flow = flowOf(3, 5, 7, 6)
        CoroutineScope(Dispatchers.Default).launch {
            println(flow.first()) // 3
            println(flow.last()) // 6
            println(flow.count()) // 4
        }
        Thread.sleep(1000)
    }

    5.2 collect

            1)源码

    public suspend fun collect(collector: FlowCollector)

            2)应用

    fun main() {
        var flow = flowOf(1, 3, 5, 7)
        CoroutineScope(Dispatchers.Default).launch {
            flow.collect(::println) // 1, 3, 5, 7
        }
        Thread.sleep(1000)
    }

    5.3 fold

            1)源码

    // 规约运算,定义运算 o, result = (((((i o e1) o e2)) o e3) o e4) o ...
    public suspend inline fun  Flow.fold(initial: R,
        crossinline operation: suspend (acc: R, value: T) -> R
    ): R

            说明:fold 与 reduce 的区别是,fold 有初值,reduce 无初值。

            2)应用

    fun main() {
        var flow = flowOf(2, 3, 5)
        CoroutineScope(Dispatchers.Default).launch {
            // 10+2+3+5=20
            var res1 = flow.fold(10, Integer::sum)
            println(res1)
            // 1*1-2*2=-3, (-3)*(-3)-3*3=0, 0*0-5*5=-25
            var res2 = flow.fold(1) { e1, e2 ->
                e1 * e1 - e2 * e2
            }
            println(res2)
        }
        Thread.sleep(1000)
    }

    5.4 reduce

            1)源码

    // 规约运算,定义运算 o, result = ((((e1 o e2)) o e3) o e4) o ...
    public suspend fun  Flow.reduce(operation: suspend (accumulator: S, value: T) -> S): S

            说明:reduce 与 fold 的区别是,reduce 无初值,fold 有初值。

            2)应用

    fun main() {
        var flow = flowOf(1, 3, 5)
        CoroutineScope(Dispatchers.Default).launch {
            var sum = flow.reduce(Integer::sum)
            println(sum) // 9
            // 1*1-3*3=-8, (-8)*(-8)-5*5=39
            var res = flow.reduce { e1, e2 ->
                e1 * e1 - e2 * e2
            }
            println(res) // 39
        }
        Thread.sleep(1000)
    }

    5.5 集合转换

            1)源码

    public suspend fun  Flow.toCollection(destination: C): C
    public suspend fun  Flow.toSet(destination: MutableSet = LinkedHashSet()): Set
    public suspend fun  Flow.toList(destination: MutableList = ArrayList()): List

            2)应用

    fun main() {
        var flow = flowOf(1, 3, 5)
        CoroutineScope(Dispatchers.Default).launch {
            var set1 = flow.toCollection(mutableSetOf()) // [1, 3, 5]
            var list1 = flow.toCollection(mutableListOf()) // [1, 3, 5]
            var set2 = flow.toSet() // [1, 3, 5]
            var list2 = flow.toList() // [1, 3, 5]
        }
        Thread.sleep(1000)
    }
VPS购买请点击我

免责声明:我们致力于保护作者版权,注重分享,被刊用文章因无法核实真实出处,未能及时与作者取得联系,或有版权异议的,请联系管理员,我们会立即处理! 部分文章是来自自研大数据AI进行生成,内容摘自(百度百科,百度知道,头条百科,中国民法典,刑法,牛津词典,新华词典,汉语词典,国家院校,科普平台)等数据,内容仅供学习参考,不准确地方联系删除处理! 图片声明:本站部分配图来自人工智能系统AI生成,觅知网授权图片,PxHere摄影无版权图库和百度,360,搜狗等多加搜索引擎自动关键词搜索配图,如有侵权的图片,请第一时间联系我们,邮箱:ciyunidc@ciyunshuju.com。本站只作为美观性配图使用,无任何非法侵犯第三方意图,一切解释权归图片著作权方,本站不承担任何责任。如有恶意碰瓷者,必当奉陪到底严惩不贷!

目录[+]