【Kotlin】Flow简介
1 前言
Flow 是 Kotlin 标准库中的一个新的异步流处理框架,旨在简化异步数据流的操作和处理,它提供了一种声明式的方式来处理数据流。
Flow 中一些接口调用有些类似 Sequence(详见 → Sequence简介),协程的使用详见 → 协程。
Flow 有以下特性和概念。
-
异步流(Asynchronous Streams):Flow 允许以一种非阻塞的方式处理一系列的值或事件,这使得在处理大量数据或涉及 IO 操作时能够更加高效。
-
冷流(Cold Flow):只有在收集器(collector)订阅(或启动)了之后才会开始发射(emit)数据。
-
热流(Hot Flow):在创建后就立即开始发射(emit)数据,不管是否有收集器(collector),这会导致收集器可能只接收到部分数据。
-
声明式 API:Flow 提供了一套简洁清晰的操作符,允许以声明式的方式对流进行处理,如 map、filter、reduce 等。
-
协程集成:Flow 构建在协程之上,因此可以与协程一起使用,并且可以利用协程的优势,比如轻量级、顺序性等。
-
取消支持:Flow 支持与协程一样的取消操作,从而释放资源和避免内存泄漏。
-
背压支持:Flow 提供了背压支持,可以通过各种操作符来控制数据的生产和消费速率,防止生产者速度过快导致消费者无法跟上。
Flow 有中间操作和终端操作,如下。
- 中间操作:每次操作返回一个新的 Flow 对象(主要操作有:flowOn、catch、buffer、conflate、filter、distinctUntilChanged、drop、take、map 等)。
- 终端操作:每次操作返回一个值或集合,每个 Flow 只能进行一次终端操作(主要操作有:first、last、count、fold、reduce、collect、toCollection、toSet、toList 等)。
2 创建 Flow
2.1 emptyFlow
public fun emptyFlow(): Flow = EmptyFlow
2.2 flow
1)源码
public fun flow(block: suspend FlowCollector.() -> Unit): Flow = SafeFlow(block)
2)应用
var coldFlow = flow { emit("A") emit("B") }2.3 MutableSharedFlow
1)源码
public fun MutableSharedFlow( replay: Int = 0, extraBufferCapacity: Int = 0, onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND ): MutableSharedFlow2)应用
var hotFlow = MutableSharedFlow() CoroutineScope(Dispatchers.Default).launch { hotFlow.emit("A") hotFlow.emit("B") }2.4 flowOf
1)源码
public fun flowOf(value: T): Flow = flow { emit(value) } public fun flowOf(vararg elements: T): Flow = flow { for (element in elements) { emit(element) } }2)应用
var flow1 = flowOf("A") var flow2 = flowOf("A", "B", "C")2.5 asFlow
2.5.1 () -> T
1)源码
public fun (() -> T).asFlow(): Flow = flow { emit(invoke()) }2)应用
fun main() { var fun2 = { fun1() }.asFlow() } fun fun1(): String { return "xxx" }2.5.2 Iterator
1)源码
public fun Iterator.asFlow(): Flow = flow { forEach { value -> emit(value) } }2)应用
var array = intArrayOf(1, 2, 3) var iterator = array.iterator() var flow = iterator.asFlow()
2.5.3 Sequence
1)源码
public fun Sequence.asFlow(): Flow = flow { forEach { value -> emit(value) } }2)应用
var sequence = sequenceOf(1, 2, 3) var flow = sequence.asFlow()
2.5.4 Array
1)源码
public fun Array.asFlow(): Flow = flow { forEach { value -> emit(value) } }2)应用
var array = arrayOf(1, 2, 3) var flow = array.asFlow()
2.5.5 Range
1)源码
public fun IntRange.asFlow(): Flow = flow { forEach { value -> emit(value) } }2)应用
var range = 1..3 var flow = range.asFlow()
2.6 zip
1)源码
public fun Flow.zip(other: Flow, transform: suspend (T1, T2) -> R): Flow
2)应用
var flow1 = flowOf(1, 3, 5) var flow2 = flowOf("A", "B", "C") // A-1, B-3, C-5 var flow = flow1.zip(flow2) { num, str -> "$str-$num" }3 Flow 冷流和热流
3.1 冷流
import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.MainScope import kotlinx.coroutines.delay import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.flow import kotlinx.coroutines.launch fun main() { val coldFlow = emitFlow() CoroutineScope(Dispatchers.Default).launch { coldFlow.collect { println("CoroutineScope, $it") } } MainScope().launch(Dispatchers.IO) { coldFlow.collect { println("MainScope, $it") } } Thread.sleep(1000) } fun emitFlow(): Flow = flow { for (i in 1..2) { emit("emit-$i") delay(100) } }打印如下。
CoroutineScope, emit-1 MainScope, emit-1 CoroutineScope, emit-2 MainScope, emit-2
说明:可以看到每一个订阅者都可以收到所有消息。
3.2 热流
import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.MainScope import kotlinx.coroutines.delay import kotlinx.coroutines.flow.MutableSharedFlow import kotlinx.coroutines.launch fun main() { var hotFlow = emitFlow() CoroutineScope(Dispatchers.Default).launch { hotFlow.collect { println("CoroutineScope, $it") } } MainScope().launch(Dispatchers.IO) { hotFlow.collect { println("MainScope, $it") } } Thread.sleep(1000) } fun emitFlow(): MutableSharedFlow { var hotFlow = MutableSharedFlow() CoroutineScope(Dispatchers.Default).launch { for (i in 1..2) { hotFlow.emit("emit-$i") delay(100) } } return hotFlow }打印如下。
MainScope, emit-2 CoroutineScope, emit-2
说明:可以看到每一个订阅者都只收到部分消息。
4 Flow 的中间操作
4.1 源码
// 切换线程 public fun Flow.flowOn(context: CoroutineContext): Flow // 捕获异常 public fun Flow.catch(action: suspend FlowCollector.(Throwable) -> Unit): Flow // 在数据流中使用一个缓冲区来存储数据, 当数据产生速率超过消费速率时, 数据会暂时存储在缓冲区中, 直到有足够的空间将其传递给订阅者。这可以确保数据不会丢失,但可能会占用更多的内存。 public fun Flow.buffer(capacity: Int = BUFFERED, onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND): Flow // 当数据产生速率超过消费速率时, 跳过一些数据, 只保留最新的数据。这样可以减少内存占用,但会丢失一部分数据。 public fun Flow.conflate(): Flow = buffer(CONFLATED) // 过滤 public inline fun Flow.filter(crossinline predicate: suspend (T) -> Boolean): Flow // 去除相邻的重复元素 public fun Flow.distinctUntilChanged(): Flow // 丢弃前 n 个元素 public fun Flow.drop(count: Int): Flow // 截取前 n 个元素 public fun Flow.take(count: Int): Flow // 映射(T -> R) public inline fun Flow.map(crossinline transform: suspend (value: T) -> R): Flow
4.2 应用
fun main() { var flow = flowOf(4, 9, 1, 8, 5, 7, 7, 5, 3, 6, 2) CoroutineScope(Dispatchers.Default).launch { flow.flowOn(Dispatchers.Default) .catch { println(it.message) }.buffer() .filter { it in 3..7 } // 4, 5, 7, 7, 5, 3, 6 .distinctUntilChanged() // 4, 5, 7, 5, 3, 6 .drop(1) // 5, 7, 5, 3, 6 .take(4) // 5, 7, 5, 3 .map { it * it } // 25, 49, 25, 9 .collect(::println) } Thread.sleep(1000) }5 Flow 的终端操作
5.1 first、last、count
1)源码
// 首元素 public suspend fun Flow.first(): T // 尾元素 public suspend fun Flow.last(): T
2)应用
fun main() { var flow = flowOf(3, 5, 7, 6) CoroutineScope(Dispatchers.Default).launch { println(flow.first()) // 3 println(flow.last()) // 6 println(flow.count()) // 4 } Thread.sleep(1000) }5.2 collect
1)源码
public suspend fun collect(collector: FlowCollector)
2)应用
fun main() { var flow = flowOf(1, 3, 5, 7) CoroutineScope(Dispatchers.Default).launch { flow.collect(::println) // 1, 3, 5, 7 } Thread.sleep(1000) }5.3 fold
1)源码
// 规约运算,定义运算 o, result = (((((i o e1) o e2)) o e3) o e4) o ... public suspend inline fun Flow.fold(initial: R, crossinline operation: suspend (acc: R, value: T) -> R ): R说明:fold 与 reduce 的区别是,fold 有初值,reduce 无初值。
2)应用
fun main() { var flow = flowOf(2, 3, 5) CoroutineScope(Dispatchers.Default).launch { // 10+2+3+5=20 var res1 = flow.fold(10, Integer::sum) println(res1) // 1*1-2*2=-3, (-3)*(-3)-3*3=0, 0*0-5*5=-25 var res2 = flow.fold(1) { e1, e2 -> e1 * e1 - e2 * e2 } println(res2) } Thread.sleep(1000) }5.4 reduce
1)源码
// 规约运算,定义运算 o, result = ((((e1 o e2)) o e3) o e4) o ... public suspend fun Flow.reduce(operation: suspend (accumulator: S, value: T) -> S): S
说明:reduce 与 fold 的区别是,reduce 无初值,fold 有初值。
2)应用
fun main() { var flow = flowOf(1, 3, 5) CoroutineScope(Dispatchers.Default).launch { var sum = flow.reduce(Integer::sum) println(sum) // 9 // 1*1-3*3=-8, (-8)*(-8)-5*5=39 var res = flow.reduce { e1, e2 -> e1 * e1 - e2 * e2 } println(res) // 39 } Thread.sleep(1000) }5.5 集合转换
1)源码
public suspend fun Flow.toCollection(destination: C): C public suspend fun Flow.toSet(destination: MutableSet = LinkedHashSet()): Set public suspend fun Flow.toList(destination: MutableList = ArrayList()): List
2)应用
fun main() { var flow = flowOf(1, 3, 5) CoroutineScope(Dispatchers.Default).launch { var set1 = flow.toCollection(mutableSetOf()) // [1, 3, 5] var list1 = flow.toCollection(mutableListOf()) // [1, 3, 5] var set2 = flow.toSet() // [1, 3, 5] var list2 = flow.toList() // [1, 3, 5] } Thread.sleep(1000) }
