코루틴 관련 포스팅은 아래의 포스팅들을 같이 순차적으로 참고하시면 좋습니다. 클릭하면 해당 포스팅으로 이동합니다.
1. 코루틴 기본 개념
2. 코루틴 빌더
3. Job과 코루틴의 라이프 사이클
4. 코루틴 컨텍스트와 스코프
5. Flow 기본 개념
6. Flow 핵심 API 정복하기
7. Flow 제어하기 (예외, 완료 등)
이전에 봤던 `async` 빌더를 사용해서 코루틴을 만들면, `await`을 통해 단일 값을 반환받을 수 있었다. 그렇다면 코루틴을 통해 여러 값들을 비동기 연산으로 반환받는 방법도 있을까? 당연히 있다. 이는 `Flow`을 통해서 가능하다. 이번 포스팅에서는 `Flow`에 대한 기본적인 문법, 특징들을 알아보고 다음 포스팅에 실제 주로 사용하는 API들을 살펴보자.
Flow란?
`Flow`는 비동기로 처리되어야 할 값들의 스트림을 나타내는 타입이다. 단어 그대로 값들의 "흐름"이라고 생각하면 된다. 대표적으로 Kotlin에서 여러 값들을 나타내는 `List`를 생각해보자. `List`는 여러 값들의 연산이 모두 끝난 후의 연산 결과를 담지만, `Flow`는 여러 연산의 흐름을 나타낸다. 한번 아래의 코드를 보자.
fun main() {
println(
measureTimeMillis {
val list = (1..3).map {
Thread.sleep(1000)
it * it
}
}
)
}
1부터 3까지의 제곱을 리스트로 만드는 코드이다. 그런데 각 연산이 1초씩 걸린다고 가정하고 만들었다. 총 소요시간은 몇이 출력될까? 예상처럼 대략 3초 정도 걸린다. 이 코드를 `Flow`로 만들어보자.
suspend fun main() {
log(
measureTimeMillis {
val flow = flow {
(1..3).forEach {
Thread.sleep(1000)
emit(it * it)
}
}
}
)
}
동일한 코드를 `Flow`로 만들어보았다. 동일하게 실제 소요 시간이 3초 정도일까? 아니다. 1ms가 소요된다. List처럼 Flow는 결과 값의 집합이 아니라, 연산의 흐름 그 자체를 표현하는 것이기 때문에 결과를 위해 실제 연산을 수행하지 않기 때문이다. (마치 `Kotlin`의 `Sequence`와 비슷하다. Sequence에 대해선 해당 글 참조)
Flow 기본 문법
우선 `Flow`의 기본 문법에 대해서 살펴본 후 그 특징에 대해서 알아보자. 위에서 사용했던 코드를 기반으로 기본적인 문법을 알아보자.
suspend fun main() {
val flow = flow { // flow 생성
(1..3).map {
Thread.sleep(1000)
emit(it * it)
}
}
flow
.map { "${sqrt(it.toDouble()).toInt()}^2 = $it" } // flow의 중간 연산자
.collect { log(it) } // flow의 종결 연산자
}
우선 `Flow`는 `flow { }`를 통해 생성할 수 있다. 블록 안에 연산을 정의하면 된다.
다른 방법으로는 `Collection`, `Sequence` 타입에서 `asFlow` 확장함수를 통해 Flow를 만들 수도 있고, `flowOf`함수를 통해서도 `List`를 생성하는 것처럼 만들 수 있다.
이후 해당 블록에서 `emit`을 통해 값을 배출할 수 있다. `emit`을 통해 배출된 값은 추후에 `Flow`의 종결 연산자를 통해 수집할 수 있다.
`Flow`는 Java의 `Stream`처럼 중간 연산자, 종결 연산자를 제공한다. 가장 기본적인 종결 연산자는 `collect`이다. `FlowCollector`라는 SAM 인터페이스를 파라미터로 받기 때문에 위와 같이 람다로 구현을 해주어서 사용한다.
Flow의 특징
Cold Stream
`Flow`는 `Sequence`와 유사하게 종결 연산이 수행되지 않으면 실행되지 않는다. 이렇게 실제 소비자가 소비하기 전 까지는 실행되지 않는 스트림을 Cold Stream이라고 부른다. (Cold Stream에 대한 더 자세한 개념은 다른 문서를 검색하는 것을 권장한다)
위에서 `Flow`에 대해 설명했을 때 `List`와 비교했던 코드를 살펴보면, `Flow`는 도합 3초가 걸리는 연산을 블록에 정의했지만 정작 `Flow`를 생성하는 시간인 1ms가 걸렸다. 이는 실제 `Flow`를 종결연산으로 수집하지 않는 이상 실행되지 않음을 알 수 있다. 실제 검증을 위해 아래 코드를 보자
suspend fun main() {
val flow = flow {
log("flow start")
(1..3).map {
Thread.sleep(1000)
emit(it * it)
}
}
log("before collect")
flow.collect()
}
만약 `flow`가 선언과 즉시 실행이 된다면 flow start라는 로그가 먼저 남아야 할 것이다. 하지만 결과는 before collect라는 로그가 먼저 남는다.
이러한 특성 때문에 `Flow`를 생성하는 `flow { }`함수 자체는 중단 함수가 아니다. 실제 생성만 한다고 실행되지 않기 때문이다. 다만 `emit`, `collect`와 같은 함수는 중단함수이기 때문에 다른 중단함수 내에서만 실행이 가능하다.
취소에 협력적
`Job`의 `cancle`처럼 `Flow`를 직접 취소하는 지점을 제공하진 않지만, 일반적인 코루틴 처럼 취소에 협력적인건 맞다.
suspend fun main() {
val flow = flow {
(1..3).map {
Thread.sleep(1000)
emit(it * it)
}
}
val result = withTimeoutOrNull(2200) {
flow.collect { log(it) }
}
if (result == null) log("flow cancelled")
}
`withTimeoutOrNull`을 통해 2초 이후에는 취소가 되는지 확인을 해보자. 취소가 되는게 맞다면 flow cancelled라는 로그가 출력되어야 한다.
[main] flow start
[main] 1
[main] 4
[main] flow cancelled
결과는 역시 로그가 출력되었다. `Flow`는 취소에 협력적이다.
순차적으로 진행
`Flow`는 `Sequence`처럼 순차적으로 진행된다.(참고) `List`의 경우에는 모든 요소에 대해서 하나의 연산을 수행하고 다음 연산으로 넘긴다. 반면 `Sequence`는 하나의 요소에 대해서 모든 연산을 수행한다. `Flow`도 마찬가지이다.
suspend fun main() {
(1..3).asFlow()
.filter {
log("filter $it")
it % 2 == 0
}.map {
log("map $it")
"$it"
}.collect {
log("collect $it")
}
}
[main] filter 1
[main] filter 2
[main] map 2
[main] collect 2
[main] filter 3
결과를 보면 filter 1, 2, 3이 먼저 출력되지 않는다. 각 요소에 대해서 모든 연산이 한번씩 순차적으로 진행되는 것을 볼 수 있다.
Flow의 컨텍스트
코루틴은 항상 특정 컨텍스트를 기반으로 실행된다. `Flow` 역시 마찬가지이다. `Flow`도 코루틴이기 때문에 컨텍스트를 기반으로 실행되며 기본적으로는 `Flow`의 연산이 실행되도록 해당 `Flow`의 종결 연산을 호출하는 코루틴의 컨텍스트에서 실행된다.
fun main() = runBlocking {
var flow = (1..3).asFlow()
val thread1 = newSingleThreadContext("Thread-1")
val thread2 = newSingleThreadContext("Thread-2")
withContext(thread1) { // 코루틴1
log("thread1 start")
flow = flow.map {
log("map $it")
it * it
}
}
withContext(thread2) { // 코루틴2
log("thread2 start")
flow.collect { log("collect $it") }
}
thread1.close()
thread2.close()
}
위 코드를 한번 보자. 1 부터 3까지의 플로우를 만들었고 단일 스레드 컨텍스트를 2개를 만들어 각각을 컨텍스트로 가지는 코루틴 2개를 만들었다. 코루틴1 에서는 중간 연산자 `map`을 통해 `flow`를 변경해주었고, 코루틴2 에서는 종결 연산자 `collect`를 통해 값을 모았다.
[Thread-1 @coroutine#1] thread1 start
[Thread-2 @coroutine#1] thread2 start
[Thread-2 @coroutine#1] map 1
[Thread-2 @coroutine#1] collect 1
[Thread-2 @coroutine#1] map 2
[Thread-2 @coroutine#1] collect 4
[Thread-2 @coroutine#1] map 3
[Thread-2 @coroutine#1] collect 9
결과를 보니, `map`과 `collect` 블록에 선언된 모든 연산이 코루틴2 컨텍스트에서 실행된 것을 볼 수 있다.
`map`은 중간연산자이기 때문에, 그냥 변경된 흐름 자체를 반환하는 역할만 수행한다. 실제 `map` 블록에 정의된 연산을 수행하는 건 아니다. 따라서 실제 연산 자체는 종결 연산자 `collect`를 호출할 때 진행되는데, `collect`를 호출하는 코루틴은 코루틴2 이기 때문에 이 컨텍스트에서 `flow`가 실행되는 것이다. 이러한 속성을 공식 문서에서는 "context preservation" 이라고 칭하고 있다.
배출 컨텍스트 변경
어떤 `Flow`를 실행해야 하는데, 해당 `Flow`를 다른 컨텍스트에서 실행하고 싶을 경우가 있다. 예를 들어, UI 작업을 위해 `Dispacher.Main` 에서 실행해야 한다거나, IO 작업을 수행해야 해서 `Dispacher.IO`에서 실행하고 싶을 수도 있다.
그런데 위에서 살펴봤듯이, `Flow`는 해당 `Flow`의 종결 연산을 호출하는 컨텍스트에서 실행된다. 이를 변경하려면 어떻게 해야 할까? 다음과 같은 방법을 생각해볼 수 있다.
fun main() = runBlocking {
val flow = flow {
withContext(Dispatchers.IO) {
(1..3).forEach {
delay(1000)
emit(it)
}
}
}
flow.collect { log("collect $it") }
}
위와 같이 `Flow` 내부에서 컨텍스트를 변경해서 작성할 수 있을 것이다. 하지만 이는 Flow invariant is violated 예외가 발생한다. `Flow`는 위에서 말한 context preservation 속성을 준수하기 때문에, 다른 컨텍스트에서 `emit` 할 수 없기 때문이다.
위 방법 대신, `flowOn`을 통해 컨텍스트를 변경해야 한다.
fun main() = runBlocking {
val singleThreadContext = newSingleThreadContext("single-thread")
val flow = flow {
(1..3).forEach {
delay(1000)
log("emit $it")
emit(it)
}
}.flowOn(singleThreadContext)
flow.collect { log("collect $it") }
singleThreadContext.close()
}
싱글 스레드 컨텍스트를 만든 다음 `flowOn`을 통해서 해당 Flow의 연산을 이 컨텍스트에서 진행하도록 했다. 그리고 `runBlocking` 블럭에서 `collect`를 하면서 로그를 찍어보자.
[main @coroutine#1] default
[single-thread @coroutine#2] emit 1
[main @coroutine#1] collect 1
[single-thread @coroutine#2] emit 2
[main @coroutine#1] collect 2
[single-thread @coroutine#2] emit 3
[main @coroutine#1] collect 3
위와 같이 `emit`을 하는 컨텍스트는 싱글 스레드를 사용하는 컨텍스트, `collect`를 하는 컨텍스트는 main 스레드를 사용하는 컨텍스트라는 것을 확인할 수 있다.
스코프 변경
만약 `Flow`의 코루틴 스코프 자체를 변경하고 싶다면 다음과 같은 방법을 생각해볼 수 있다.
fun main() = runBlocking {
val singleThreadContext = newSingleThreadContext("other-thread")
val newScope = CoroutineScope(singleThreadContext)
val job = newScope.launch {
(1..3).asFlow()
.onEach {
delay(100)
log("$it")
}.launchIn(newScope)
}
while (job.isActive) {
log("do other tasks...")
delay(100)
}
}
`CoroutineScope()`함수를 통해 새로운 영역을 만든 후, 이 스코프로부터 코루틴을 새로 만들어 여기서 `Flow`를 수집하면 된다. 하지만 이보다 더 간단한 방법은 `launchIn`을 사용하는 것이다.
fun main() = runBlocking {
val singleThreadContext = newSingleThreadContext("other-thread")
val newScope = CoroutineScope(singleThreadContext)
val job = (1..3).asFlow()
.onEach {
delay(100)
log("$it")
}.launchIn(newScope)
while (job.isActive) {
log("do other tasks...")
delay(100)
}
}
`launchIn`은 `collect`대신 사용할 수 있는 종결연산자로, 주어진 스코프에서 해당 `Flow`를 실행시킨다.
만약 스코프를 변경하지 않고, 같은 스코프의 새로운 코루틴에서 실행시키고 싶다면 더 간단하다.
fun main() = runBlocking {
(1..3).asFlow()
.onEach {
delay(100)
log("$it")
}.launchIn(this)
log("do other tasks...")
}
위와 같이 `this`를 통해 현재 스코프를 인자로 넣어주면 된다.
`launchIn`을 사용하는 경우는 보통 `Flow`을 진행시키되, 해당 `Flow`가 완료될 때 까지 기다릴 필요가 없는 경우에 사용한다. 그냥 `collect`를 사용하면 해당 Flow가 모두 `collect`될 때 까지 아래의 코드는 실행되지 않는다. 당연히 하나의 코루틴으로 묶여있기 때문이다. 한번 아래 코드를 실행시켜보자.
fun main() = runBlocking {
(1..3).asFlow()
.onEach {
delay(100)
log("$it")
}.collect()
log("do other tasks...")
}
[main @coroutine#1] 1
[main @coroutine#1] 2
[main @coroutine#1] 3
[main @coroutine#1] do other tasks...
`Flow`가 모두 완료되고 나서 밑에 로그가 남는다. 하지만 `launchIn`을 사용한 코드는 다르다.
[main @coroutine#1] do other tasks...
[main @coroutine#2] 1
[main @coroutine#2] 2
[main @coroutine#2] 3
로그에서 보이듯이 다른 코루틴에서 `Flow`가 실행되기 때문에, 다른 작업에 영향을 미치지 않는다. 그냥 `launch { }`를 통해 새로운 코루틴을 만들고 이 코루틴에서 `Flow`가 실행된다고 생각하면 당연한 결과이다.