코루틴 관련 포스팅은 아래의 포스팅들을 같이 순차적으로 참고하시면 좋습니다. 클릭하면 해당 포스팅으로 이동합니다.
1. 코루틴 기본 개념
2. 코루틴 빌더
3. Job과 코루틴의 라이프 사이클
4. 코루틴 컨텍스트와 스코프
5. Flow 기본 개념
6. Flow 핵심 API 정복하기
7. Flow 제어하기 (예외, 완료 등)
이전 포스팅에는 Flow의 기본 개념과 특징들에 대해서 서술했다. 이번에는 중간 연산자(intermediate operator), 종결 연산자(terminal operator), 버퍼링(buffering), 여러 코루틴의 조합(composing), 평탄화(flattening) 들과 관련하여 핵심 API들을 상세하게 살펴보도록 하자.
중간 연산자
`Flow`는 `Collection`, `Sequence` 처럼 중간 연산을 통해 그 흐름을 변환할 수 있다. 이전에 말했지만 이 중간 연산자 역시 Cold Stream과 같은 방식으로 진행된다. 즉, 종결 연산자가 호출되지 않는 한 수행되지 않는다. 따라서 중간 연산자 역시 중단 함수가 아니다. 핵심 중간 연산자를 알아보자.
map
`Collection`의 `map`함수와 동일하다. 배출되는 값을 변경한다.
fun main() = runBlocking {
(1 .. 3).asFlow()
.map { it * it }
.collect { print("$it ") } // 1 4 9
}
1 ~ 3 까지의 수를 `Flow`로 만든 후` map`을 통해 배출되는 값을 원래 값의 제곱으로 변환했다.
filter
`Collection`의 `filter`와 동일하다. 배출되는 값을 필터링한다.
fun main() = runBlocking {
(1 .. 3).asFlow()
.filter { it == 2 }
.collect { print("$it") } // 2
}
배출되는 값이 2인 것만 필터링해서 배출되는 값을 변경했다.
transform
위의 `map`, `filter`보다 보다 더 복잡한 변환을 수행할 수 있도록 설계된 중간 연산자이다. `map`, `filter`는 각각 `(T) -> R`, `(T) -> Boolean` 형식의 람다를 필요로 했지만, `transform`은 `FlowCollector`가 수신객체로 포함된 람다(즉 `FlowCollector`의 확장함수)로 선언되어 있기 때문에 `emit`도 내부 블록에서 호출이 가능하다.
fun main() = runBlocking {
(1 .. 3).asFlow()
.transform {
emit("$it")
emit("*")
emit("$it")
emit("=")
emit("${it * it} ,")
}
.collect { print(it) } // 1*1=1, 2*2=4, 3*3=9,
}
위와 같이 `emit`을 통해 배출시키는 값을 여러 개로 변경 할 수도 있다.
take
`Collection`의 `filter`와 동일하다. 배출되는 값의 수를 제한한다. 그런데 코루틴이기 때문에 오는 차이점이 있는데, 배출되는 값이 제한하려는 값에 도달한다면 연산을 중지시키는 방식으로 작동되기 때문에 코루틴의 중지 매커니즘을 그대로 따른다. 즉, `CancellationException`을 통해 중지시킨다.
fun main(): Unit = runBlocking {
flow {
try {
emit(1)
emit(2)
emit(3)
} catch (e: Exception) {
println(e::class.simpleName)
throw e
}
}.take(2).collect(::println)
}
1, 2, 3을 배출시키는 `Flow`을 만든 다음, 배출되는 값을 2개로 제한했다. `Flow` 내부에는 예외를 잡아서 예외 클래스 이름을 출력해봤다.
1
2
AbortFlowException
결과는 위와같이 2까지 출력이 되고, `AbortFlowException`이 출력된다. `AbortFlowException`은 `CancellationException`을 상속한 예외로 이를 통해 `take`는 `Flow`를 중단하는 원리로 구현되었음을 알 수 있고, `Flow`의 중단 역시 `CancellationException`을 통해 이뤄짐을 알 수 있다.
종결 연산자
종결 연산자는 `Flow`의 수집을 시작하는 일시 중단 함수이다. 이전부터 계속 봤던 `collect`가 대표적인 종결 연산자이다. 하지만 코루틴은 이 외에도 `Flow`를 쉽게 사용할 수 있도록 여러 종결 연산자들을 제공하고 있다. 한번 살펴보자.
toList, toSet
이름 그대로 `Flow`에서 배출되는 값들을 모아 각각 `List`, `Set`으로 반환하는 종결 연산자이다.
fun main(): Unit = runBlocking {
val flow = flow {
emit(1)
emit(2)
emit(3)
}
val mutableList = mutableListOf<Int>()
flow.collect(mutableList::add)
val list = flow.toList()
println(list == mutableList) // true
}
`collect`를 사용해서 배출되는 값을 `List`로 만들려면 가변 리스트를 선언해서 이에 추가하는 방식으로 가능하다. 하지만 `toList`를 사용하면 이러한 과정이 필요가 없다. 이는 `toSet`도 마찬가지이다.
first, single
`Flow`에서 배출되는 첫 번째 값을 가져오는 함수이다. 두 함수의 차이는 배출되는 값을 무조건 1개로 제한을 두는지 안두는지에 따라 다르다.
fun main(): Unit = runBlocking {
val flow = flow {
emit(1)
emit(2)
emit(3)
}
try { println(flow.first()) } // 1
catch (e: Exception) { println(e) }
try { println(flow.single()) }
catch (e: Exception) { println(e) } // IllegalArgumentException: Flow has more than one element
}
`first`의 경우에는 `Flow`에서 배출되는 값과는 상관 없이 무조건 1개만 가져온다. 이후 `Flow`에서 진행되는 연산은 중지시킨다. (`take`와 동일한 원리이다)
반면 `single`의 경우에는 `Flow`에서 배출되는 값이 무조건 1개여야 한다. 아닌 경우 `IllegalArgumentException`을 던진다.
공통적으로 두 함수 모두 `Flow`에서 배출되는 값이 없는 경우에는 `NoSuchElementException`을 던진다.
reduce, fold
`Flow`의 값들을 단일 값으로 줄이는 경우 사용한다. 두 함수는 파라미터로 `(acc: R, value: T) -> R` 람다를 받는데, `acc`는 이전 연산에서 반환된 값을, `value`는 현재 연산에서 `Flow`가 배출한 값을 의미한다. 말로는 설명이 어려우니, `reduce` 함수부터 일단 예제를 살펴보자.
fun main() = runBlocking {
val flow = flow {
emit(1)
emit(2)
emit(3)
}
val reduceResult = flow.reduce { acc, value ->
println("$acc + $value = ${acc + value}")
acc + value
}
println(reduceResult)
}
`acc`에는 이전 `reduce` 블럭에서 연산된 값이, `value`에는 현재 `Flow`에서 배출하는 값이 주어진다. 실행 결과를 보자.
1 + 2 = 3
3 + 3 = 6
6
총 배출은 3번이지만, 처음 1이 배출될 때에는 `acc` 값이 없기 때문에 `reduce` 블럭 내의 연산이 실행되지 않는다.
그림으로 보면 위와 같을 것이다. 위와 같은 순서로 진행이 되어서 결국 하나의 값으로 최종 배출 값이 줄어든다.
`fold` 함수는 초기값을 받는다는 점에서 차이가 있다. 코드를 보자.
fun main() = runBlocking {
val flow = flow {
emit(1)
emit(2)
emit(3)
}
val foldResult = flow.fold(10) { acc, value ->
println("$acc + $value = ${acc + value}")
acc + value
}
println(foldResult)
}
10 + 1 = 11
11 + 2 = 13
13 + 3 = 16
16
`fold`는 초기값으로 넣어준 인자가 첫 `fold` 연산의 `acc`로 들어간다. 따라서 `Flow`에서 첫번째로 배출하는 값에서도 연산이 수행된다.
그림으로 표현하면 위와 같을 것이다.
버퍼링
버퍼링이란 단어, 많이 들어봤을 것이다. 프로그래밍 사전적인 의미로는 데이터의 송수신을 원할하게 하기 위해 메모리에 미리 적재하여 처리속도를 완충하는 것을 의미한다. 유튜브를 보면 버퍼링이 종종 일어나는데, 영상 데이터를 메모리에 미리 적재하여 이후 영상을 끊김없이 부드럽게 보여주기 위함이다. 유튜브의 버퍼링을 코루틴에 대입해보자.
유튜브의 버퍼링은 영상 데이터를 수신받아 메모리에 미리 적재하는 시점, 사용자에게 영상을 보여주는 시점 이렇게 두 개의 시점으로 본다면, 코루틴의 관점에서는 값을 배출하는 시점, 갑을 수집하는 시점으로 대입해볼 수 있다. Flow에서 배출하는 시점과 영상 데이터를 수신받는 시점은 데이터가 생성된다는 점에서 동일하다. 반대로 배출된 값을 수집하는 시점 수신받은 데이터를 영상으로 보여주는 시점은 값을 소비한다는 점에서 동일하다.
그런데 유튜브를 버퍼링 없이 본다면 어떤 문제가 생길까? 쉽게 생각하기 위해 영상 데이터를 정수라고 생각해보자. 1 이라는 영상 데이터를 수신받으면 1 데이터를 표현하기 위해 컴퓨터가 이를 처리해서 화면에 보여줄 것이다. 그런 다음 2라는 영상 데이터를 수신 받고 이를 처리해서 화면에 보여준다. 만약 수신에 1초, 화면 표시에 2초가 걸린다면 1이라는 데이터를 표시하기 위해 도합 3초가 걸린다. 또한 다음 데이터를 수신받기 위해 화면에 표시되는 2초간 다음 데이터 수신을 대기한다.
하지만 버퍼링을 통한다면 영상 데이터는 따로 미리 메모리에 저장을 시키면서, 화면에 표시되기 위해 데이터를 처리하는 것도 따로 진행할 수 있을 것이다.
그림으로 표현하면 위와 같을 것이다. (버퍼링에 대한 정확한 설명은 아니다. 코루틴의 버퍼링을 쉽게 이해하기 위해 위와 같이 표현했다) 따라서 이전에는 데이터 2번이 표현되기까지 원래 6초가 걸렸다면 버퍼링을 통해 4초(데이터 수신 2초 따로, 제일 오래 걸리는 데이터 처리 4초가 걸린다)까지 소요 시간을 줄일 수 있게 되었다.
코루틴도 마찬가지이다. 이 버퍼링을 통해 두 시점을 나누어서 처리하면 소요 시간을 줄일 수 있다. 코루틴의 `buffer` 함수를 살펴보자.
buffer
코루틴의 `buffer`는 데이터의 생산(배출)과 소비(수집)을 서로 다른 코루틴에서 처리되도록 하는 함수이다. 코드를 바로 살펴보자.
fun main(): Unit = runBlocking {
val startTime = System.currentTimeMillis()
val flow = flow {
(1..5).forEach {
delay(100)
log("emit $it -> ${System.currentTimeMillis() - startTime}ms")
emit(it)
}
}
flow
.buffer()
.collect {
delay(300)
log("collect buffer $it -> ${System.currentTimeMillis() - startTime}ms")
}
}
1 ~ 5 까지 배출하는 `Flow`를 만들었고, `buffer`를 통해 서로 다룬 코루틴에서 실행되도록 했다.
[main @coroutine#2] emit 1 -> 127ms
[main @coroutine#2] emit 2 -> 232ms
[main @coroutine#2] emit 3 -> 333ms
[main @coroutine#1] collect buffer 1 -> 432ms
[main @coroutine#2] emit 4 -> 433ms
[main @coroutine#2] emit 5 -> 535ms
[main @coroutine#1] collect buffer 2 -> 736ms
[main @coroutine#1] collect buffer 3 -> 1040ms
[main @coroutine#1] collect buffer 4 -> 1345ms
[main @coroutine#1] collect buffer 5 -> 1651ms
보면 `emit`은 coroutine#2에서, `collect`는 coroutine#1에서 수행된 것을 볼 수 있다. 1이 배출되면 1를 수집하기 시작한다. 수집되는 0.3초간 2와 3이 배출되어 메모리에 저장되고, 1이 수집이 완료된다. 이후에 4와 5가 수집되고 0.6초 뒤에 2가 수집된 것을 볼 수 있다.
코드 12번째 라인의 `buffer()`를 지우고 실행해보자.
[main @coroutine#1] emit 1 -> 111ms
[main @coroutine#1] collect 1 -> 414ms
[main @coroutine#1] emit 2 -> 519ms
[main @coroutine#1] collect 2 -> 824ms
[main @coroutine#1] emit 3 -> 930ms
[main @coroutine#1] collect 3 -> 1236ms
[main @coroutine#1] emit 4 -> 1340ms
[main @coroutine#1] collect 4 -> 1645ms
[main @coroutine#1] emit 5 -> 1750ms
[main @coroutine#1] collect 5 -> 2056ms
차이가 확연히 보인다. 배출과 수집을 같은 코루틴에서 하기 때문에, 배출되는 0.1초를 기다리고 수집되는 0.3초를 기다리고 그 다음 배출이 이루어진다. 따라서 한번 연산을 할 때마다 대략 0.4초가 걸려버린다.
conflate
`buffer`와 동일하게 배출과 수집을 다른 코루틴에서 진행시킨다. 차이점은 수집하는 시점에 마지막으로 배출된 값만 수집한다. 중간에 배출됐던 값들은 수집하지 않는다.
fun main(): Unit = runBlocking {
val startTime = System.currentTimeMillis()
val flow = flow {
(1..5).forEach {
delay(100)
log("emit $it -> ${System.currentTimeMillis() - startTime}ms")
emit(it)
}
}
flow
.conflate()
.collect {
log("collect conflate $it start -> ${System.currentTimeMillis() - startTime}ms")
delay(300)
log("collect conflate $it end -> ${System.currentTimeMillis() - startTime}ms")
}
}
위의 코드에서 `buffer()`를 `conflate()`로 바꿨다. 그리고 `collect` 블록에서 로그를 시작 지점에도 출력하도록 변경했는데, 그 이유는 실제 블록이 시작되는 시점에 마지막으로 배출된 값에 대해 수집을 시작하기 때문이다.
[main @coroutine#2] emit 1 -> 120ms
[main @coroutine#1] collect conflate 1 start -> 124ms
[main @coroutine#2] emit 2 -> 229ms
[main @coroutine#2] emit 3 -> 329ms
[main @coroutine#1] collect conflate 1 end -> 429ms
[main @coroutine#1] collect conflate 3 start -> 429ms // 시작 시점에 마지막으로 배출된 값이 3이다.
[main @coroutine#2] emit 4 -> 429ms
[main @coroutine#2] emit 5 -> 530ms
[main @coroutine#1] collect conflate 3 end -> 734ms
[main @coroutine#1] collect conflate 5 start -> 735ms // 시작 시점에 마지막으로 배출된 값이 5이다.
[main @coroutine#1] collect conflate 5 end -> 1040ms
1번이 배출되어 1번이 수집되었다. 이후 2번이 배출되었는데 아직 1번의 수집이 끝나지 않았기 때문에 2번이 수집되지 않고 있다. 이후 3번이 배출되었을 때 1번의 수집이 끝나 다음 값을 수집하는데, 중간에 배출되었던 2번은 무시된다. 제일 마지막에 배출된 3번을 수집한다.
`conflate`은 보다싶이 배출된 값이 모두 필요하지 않을 경우에 주로 사용한다. 예를 들어 특정 프로세스의 진행도를 1의 단위로 100까지 배출한다고 했을 때, 해당 프로세스의 진행도를 UI에 그리는데 굳이 1의 단위로 모두 필요하지 않은 경우, `conflate`를 사용해볼 수 있을 것이다.
collectLatest
`collectLatest`는 이름 그대로 마지막 값만 수집한다. 근데 동작 방식은 일반적인 코루틴의 버퍼링 처럼 배출과 수집을 서로 다른 코루틴에서 실행시킨다.
우선 `Flow`에서 값을 배출하면 해당 값을 다른 코루틴에서 수집한다. 수집을 처리하는 도중에 `Flow`에서 다른 값이 배출되는 경우 기존에 수집중이던 블럭을 취소시키고 새로운 코루틴을 만들어 배출된 값에 대해 수집을 시작한다. 이 과정을 반복하다 보면, 마지막에 배출되는 값은 수집이 취소되지 않고 완료가 된다. 말로만 이해하기 힘드니 한번 코드를 살펴보자.
fun main(): Unit = runBlocking {
val startTime = System.currentTimeMillis()
val flow = flow {
(1..5).forEach {
delay(100)
log("emit $it -> ${System.currentTimeMillis() - startTime}ms")
emit(it)
}
}
flow
.collectLatest {
try {
log("collect $it start -> ${System.currentTimeMillis() - startTime}ms")
delay(300)
log("collect $it end -> ${System.currentTimeMillis() - startTime}ms")
} catch (e: Exception) { log("$it throw ${e::class.simpleName} -> ${System.currentTimeMillis() - startTime}ms") }
}
}
위의 `conflate`와 동일한 코드인데, `collectLatest` 블록에서 예외를 잡아 로그를 출력하는 코드를 추가했다. 위에서 설명했듯이 새롭게 배출되는 값이 있다면 기존에 수집하던 코루틴을 중지시키기 때문이다. 코루틴의 중지는 예외를 통해 중지가 되기 때문에, 실제 예외가 발생한다면 이 코루틴이 중지되었음을 알 수 있다. 한번 결과를 살펴보자.
[main @coroutine#2] emit 1 -> 126ms
[main @coroutine#3] collect 1 start -> 130ms
[main @coroutine#2] emit 2 -> 235ms
[main @coroutine#3] 1 throw ChildCancelledException -> 284ms
[main @coroutine#4] collect 2 start -> 286ms
[main @coroutine#2] emit 3 -> 387ms
[main @coroutine#4] 2 throw ChildCancelledException -> 387ms
[main @coroutine#5] collect 3 start -> 387ms
[main @coroutine#2] emit 4 -> 489ms
[main @coroutine#5] 3 throw ChildCancelledException -> 490ms
[main @coroutine#6] collect 4 start -> 490ms
[main @coroutine#2] emit 5 -> 592ms
[main @coroutine#6] 4 throw ChildCancelledException -> 592ms
[main @coroutine#7] collect 5 start -> 592ms
[main @coroutine#7] collect 5 end -> 898ms
1 ~ 4까지는 `ChildCancelledException`이 발생하여 코루틴이 중지된 것을 볼 수 있다. 중지되는 시점도 자세히 보면, 다음 값이 배출되는 시점과 동일하다. 최종적으로 5가 배출될 때에는 다른 값이 배출되지 않기에 수집이 완료되어 end 로그까지 남는 것을 확인할 수 있다.
여러 코루틴의 조합
동기식 코드에서 `Collection`을 사용할 때, 두 `Collection`을 더하거나, 공통된 값을 구한다거나 하는 등의 조합을 할 수 있었다. 코루틴의 플로우도 역시 조합할 수 있다.
zip
`Sequence`의 `zip`함수와 유사하다. 두 개의 `Flow`에서 배출되는 상응되는 값을 조합한다.
fun main() = runBlocking {
val nums = (1..3).asFlow()
val strs = flowOf("one", "two", "three")
nums.zip(strs) { a, b -> "$a - $b" }
.collect { println(it) }
}
1 - one
2 - two
3 - three
`zip`은 둘 중 하나의 `Flow`가 완료된다면, 남은 `Flow`는 취소된다. 위 코드에서 만약 `nums`가 총 4번을 배출하는 Flow였다면, 마지막 4는 `strs`이 완료되었기 때문에 `nums`는 취소되어 배출되지 않는다.
combine
`combine`역시 두 `Flow`를 조합할 때 사용한다. `zip`은 두 `Flow`간의 상응하는 값을 조합하는 반면, `combine`은 하나의 `Flow`에서 배출될 때마다 블록이 실행된다. 한번 예시를 보자.
fun main() = runBlocking {
val nums = (1..5).asFlow().onEach { delay(300) }
val strs = flowOf("one", "two", "three").onEach { delay(400) }
val startTime = System.currentTimeMillis()
nums.zip(strs) { a, b -> "$a - $b" }
.collect { value ->
println("$value -> ${System.currentTimeMillis() - startTime}ms")
}
}
두 `Flow`를 `zip`으로 조합한다면 무조건 상응하는 값을 조합한다. `nums`는 300ms마다 한번, `strs`는 400ms마다 한번 배출하기 때문에 `nums`에서 값을 먼저 배출한다고 해도 `strs`에서 값을 배출할 때까지 조합되지 않는다. 게다가 위에서 설명했듯, `nums`에서 매출하는 4, 5는 `strs`가 완료되었기 때문에 조합되지 않는다.
1 - one -> 413ms
2 - two -> 818ms
3 - three -> 1223ms
결과를 확인해보면 수집이 모두 `strs`가 배출하기까지 소요되는 시간인 400ms 간격으로 된 것을 볼 수 있다. 그렇다면 `zip`을 `combine`으로 단순히 바꿔보자.
fun main() = runBlocking {
val nums = (1..5).asFlow().onEach { delay(300) }
val strs = flowOf("one", "two", "three").onEach { delay(400) }
val startTime = System.currentTimeMillis()
nums.combine(strs) { a, b -> "$a - $b" }
.collect { value ->
println("$value -> ${System.currentTimeMillis() - startTime}ms")
}
}
1 - one -> 417ms
2 - one -> 626ms
2 - two -> 818ms
3 - two -> 927ms
3 - three -> 1224ms
4 - three -> 1228ms
5 - three -> 1534ms
출력을 보면 우선 첫번째 수집은 `strs`가 처음으로 배출하는 시점에 되었다. 하지만 이후에 `nums`가 2를 배출할 때를 보면, `strs`가 아직 two를 배출하지 않았기 때문에 one과 조합되어 배출된 것을 볼 수 있다. 두 `Flow`중 하나의 `Flow`에서 배출이 되면 `combine`블록이 실행되고, 아직 배출되지 않은 `Flow`의 값은 가장 최근에 배출된 값을 조합한다.
평탄화
공식 문서에서는 flattening이라고 표현하고 있어서 평탄화라고 했는데, 사실 워딩만 보면 뭔지 이해하기 힘들다. 그냥 가장 단순하게 `Iterable`의 `flatMap`을 생각하면 된다. `flatMap`은 블록 안에서 생성된 모든 요소를 하나의 `Iterable`로 변환해주는 함수이다. 코루틴의 flattening도 마찬가지이다. `Flow<Flow<T>>`타입을 하나의 `Flow<T>`로 만들어주는 방법에 대해 평탄화라는 워드로 표현했다. 다만, 비동기적으로 여러 요소들이 계산되는 만큼 `flatMap`보다는 다양한 방법이 필요하기에 코틀린에서는 총 3가지 API를 제공한다고 한다.
이제 3가지 API를 비교할 것인데 `Flow<Flow<T>>`에서 밖을 감싸는 `Flow`를 "외부 `Flow`", 외부 `Flow`의 타입 파라미터로 선언된 `Flow`를 "내부 `Flow`" 라는 용어로 설명을 할 것이다.
flatMapConcat
순차적으로 평탄화하는 함수이다. 외부 `Flow`가 배출될 때마다 내부 `Flow`가 순차적으로 배출된다.
fun main() = runBlocking {
val outer = (1..3).asFlow().onEach { delay(500) }
val inner = { num: Int -> flowOf("$num - one", "$num - two", "$num - three")
.onEach { delay(1000) } }
val startTime = System.currentTimeMillis()
outer
.flatMapConcat { inner(it) }
.collect { log("$it -> ${(System.currentTimeMillis() - startTime) / 100 * 100}ms") }
}
외부에는 1 ~ 3을 배출하는`outer`가 있고, 내부에는 문자열을 배출하는 `inner`함수를 통해 만들어지는 `Flow`가 생성된다. 순차적으로 `outer`에서 한번 값이 배출되고, flatMapConcat 내부 블록에서 반환하는 `Flow`가 한번 배출된다. 결국 외부 `Flow`에서 배출되는 만큼 내부 `Flow`가 실행된다. 한번 결과를 보자
[main @coroutine#1] 1 - one -> 1500ms
[main @coroutine#1] 1 - two -> 2500ms
[main @coroutine#1] 1 - three -> 3500ms
[main @coroutine#1] 2 - one -> 5000ms
[main @coroutine#1] 2 - two -> 6000ms
[main @coroutine#1] 2 - three -> 7000ms
[main @coroutine#1] 3 - one -> 8500ms
[main @coroutine#1] 3 - two -> 9500ms
[main @coroutine#1] 3 - three -> 10500ms
외부 `Flow`는 배출까지 500ms가 걸리고, 내부 `Flow`는 1000ms가 걸린다. 따라서 첫번째 수집되는 시간은 1500ms로 확인된다. 두 번째 수집되는 시간은 내부 `Flow`의 소요시간인 1000ms가 추가되어 2500ms에 수집된 것을 볼 수 있다. 이렇게 외부 -> 내부 순차적으로 배출하며 실행되는 `Flow`가 배출이 끝날 때 까지 기다리며 평탄화하는 경우에는 `flatMapConcat`을 사용하면 된다.
참고로 `Flow<Flow<T>` 타입의 확장함수로 `flattenConcat`가 있다. 이를 사용하면 `flatMapConcat`과 동일한 방식으로 `Flow<T>`로 변환해준다. 즉, `map { }.flattenConcat()`을 합치면 `flatMapConcat { }`이 된다.
flatMapMerge
`flatMapMerge`는 외부, 내부 `Flow`를 순차적으로, 실행중인 `Flow`가 완료될 때 까지 기다리지 않고 동시에 실행하며 병합한다. 바로 예제부터 보자.
fun main() = runBlocking {
val startTime = System.currentTimeMillis()
val outer = (1..3).asFlow()
.onEach {
delay(500)
log("[emit outer] $it -> ${(System.currentTimeMillis() - startTime) / 100 * 100}ms")
}
val inner = { num: Int ->
flowOf("$num - one", "$num - two", "$num - three")
.onEach {
delay(1000)
log("[emit inner] $it -> ${(System.currentTimeMillis() - startTime) / 100 * 100}ms")
}
}
outer
.flatMapMerge { inner(it) }
.collect { log("[collect] $it -> ${(System.currentTimeMillis() - startTime) / 100 * 100}ms") }
}
`flatMapConcat`을 설명할 때와 동일한 코드인데, 동작 방식을 이해하기 위해 외부, 내부 Flow가 배출될 때 각각 로그를 추가했다. 로그부터 살펴보자.
[main @coroutine#2] [emit outer] 1 -> 500ms
[main @coroutine#2] [emit outer] 2 -> 1000ms
[main @coroutine#3] [emit inner] 1 - one -> 1500ms
[main @coroutine#1] [collect] 1 - one -> 1500ms
[main @coroutine#2] [emit outer] 3 -> 1500ms
[main @coroutine#4] [emit inner] 2 - one -> 2000ms
[main @coroutine#1] [collect] 2 - one -> 2000ms
[main @coroutine#3] [emit inner] 1 - two -> 2500ms
[main @coroutine#5] [emit inner] 3 - one -> 2500ms
[main @coroutine#1] [collect] 1 - two -> 2500ms
[main @coroutine#1] [collect] 3 - one -> 2500ms
[main @coroutine#4] [emit inner] 2 - two -> 3000ms
[main @coroutine#1] [collect] 2 - two -> 3000ms
[main @coroutine#3] [emit inner] 1 - three -> 3500ms
[main @coroutine#5] [emit inner] 3 - two -> 3500ms
[main @coroutine#1] [collect] 1 - three -> 3500ms
[main @coroutine#1] [collect] 3 - two -> 3500ms
[main @coroutine#4] [emit inner] 2 - three -> 4000ms
[main @coroutine#1] [collect] 2 - three -> 4000ms
[main @coroutine#5] [emit inner] 3 - three -> 4500ms
[main @coroutine#1] [collect] 3 - three -> 4500ms
보면 우선 `outer`가 500ms를 소요하여 1을 배출한다. 이후 `outer`는 `inner`를 기다리지 않고 500ms 뒤에 2를 배출하는 것을 볼 수 있다. 그 이후에 보면 `inner`는 1을 배출했던 시간 1000ms 뒤에 1 - one을 배출하는 것을 볼 수 있다. `inner`가 배출된 이후 바로 1 - one이 수집된다.
그림으로 수집되는 시간을 계산해보면 위와 같을 것이다. `outer`에서 1을 배출하면 `inner`가 실행되어 1000ms 간격으로 배출한다. 그와 동시에 `outer`는 계속 동시에 실행이 되며 500ms뒤에 2를 배출한다. 2를 배출하는 순간 `inner`가 실행되어 1000ms 간격으로 배출한다. 이렇게 `Flow`간 순차적으로 진행되지 않고, 동시에 평탄화하려면 `flatMapMerge`를 사용하면 된다.
로그를 확인하면 동시에 실행된다고 해서 여러 개의 코루틴을 사용하는 것은 아닌 것 같다. 하나의 코루틴에서 중단과 재개를 통해 동시에 실행되는 것처럼 보인다.
`flattenConcat`과 마찬가지로 `Flow<Flow<T>>` 타입의 확장함수 `flattenMerge` 도 있다.
flatMapLatest
`flatMapLatest`는 외부 `Flow`에서 새로운 값이 배출되면, 이전에 실행중이던 내부 `Flow`를 중지시키면서 평탄화하는 함수이다. 마치 이전에 봤던 `collectLatest`와 동작 방식이 비슷하다.
fun main() = runBlocking {
val startTime = System.currentTimeMillis()
val outer = (1..3).asFlow()
.onEach {
delay(500)
log("[emit outer] $it -> ${(System.currentTimeMillis() - startTime) / 100 * 100}ms")
}
val inner = { num: Int ->
flow {
(1..3).forEach {
try {
delay(300)
emit("$num - $it")
log("[emit inner] $num - $it -> ${(System.currentTimeMillis() - startTime) / 100 * 100}ms")
} catch (e : Exception) {
log("[${e::class.simpleName}] $num - $it -> ${(System.currentTimeMillis() - startTime) / 100 * 100}ms")
throw e
}
}
}
}
outer
.flatMapLatest { inner(it) }
.collect { log("[collect] $it -> ${(System.currentTimeMillis() - startTime) / 100 * 100}ms") }
}
코드가 좀 바꼈는데, `inner`에서도 1 ~ 3 까지 배출하고 300ms로 딜레이를 수정했다. 그리고 중간에 예외를 잡아서 로그를 출력하도록 변경했다. 한번 결과를 보자.
[main @coroutine#2] [emit outer] 1 -> 500ms
[main @coroutine#3] [emit inner] 1 - 1 -> 800ms
[main @coroutine#1] [collect] 1 - 1 -> 800ms
[main @coroutine#2] [emit outer] 2 -> 1000ms
[main @coroutine#3] [ChildCancelledException] 1 - 2 -> 1100ms
[main @coroutine#4] [emit inner] 2 - 1 -> 1400ms
[main @coroutine#1] [collect] 2 - 1 -> 1400ms
[main @coroutine#2] [emit outer] 3 -> 1600ms
[main @coroutine#4] [ChildCancelledException] 2 - 2 -> 1600ms
[main @coroutine#5] [emit inner] 3 - 1 -> 1900ms
[main @coroutine#1] [collect] 3 - 1 -> 1900ms
[main @coroutine#5] [emit inner] 3 - 2 -> 2200ms
[main @coroutine#1] [collect] 3 - 2 -> 2200ms
[main @coroutine#5] [emit inner] 3 - 3 -> 2500ms
[main @coroutine#1] [collect] 3 - 3 -> 2500ms
`outer`에서 1을 배출하고 300ms 후 `inner`를 통해 생성된 `Flow` 에서 1을 배출한다. 배출과 동시에 평탄화되어 수집도 된다. 이후 outer 에서 2를 배출하면, 이전에 1을 배출해서 실행된 내부 `Flow` 는 취소된다. 따라서 로그 5번째 라인을 보면 1 - 2는 코루틴을 취소시키는 예외가 발생된 것을 볼 수 있다.
정리하면, `flatMapMerge`처럼 외부 `Flow`와 내부 `Flow`를 순차적으로 이전 `Flow`가 완료될 때 까지 기다리지 않는다. 그런데 외부 `Flow`가 완료되면, 해당 외부 `Flow`에서 실행된 내부 `Flow`는 취소시킨다.
뭔가 latest라는 단어가 들어가서 두 `Flow`간 가장 최신 값만을 수집할 거 같은데 사실은 위와 같이 동작하니, 동작 방식을 유의해서 사용해야 할 것 같다. 참고로 코루틴을 중지시키기 때문에 출력을 보면 각각의 `Flow`를 다른 코루틴에서 실행시키는 것도 확인할 수 있다. 이러한 방식이 `collectLatest`와 비슷하다.