본문 바로가기
Android/Kotlin

[Kotlin/coroutine] 6. Asynchronous Flow (2)

by 겸 2023. 10. 26.

Composing multiple flows

multiple flow를 구성하는 여러 방법을 알아보자.

Zip

Kotlin 표준 라이브러리에 있는 Sequencezip처럼 flow도 두개의 flow를 합치기 위한 zip연산자가 있다.

 

코드

val nums = (1..3).asFlow() // numbers 1..3
val strs = flowOf("one", "two", "three") // strings 
nums.zip(strs) { a, b -> "$a -> $b" } // compose a single string
    .collect { println(it) } // collect and print

실행 결과

1 -> one
2 -> two
3 -> three

Combine

flow가 변수나 작업의 가장 최근의 값을 나타내는 경우, 해당 flow의 가장 최근의 값에 따라 계산을 수행하고 upstream이 발생할 때마다 이를 다시 계산해야할 수도 있다.

 

예를 들어, 만약 이전 예제의 숫자가 300ms마다 업데이트되지만, 문자열은 400ms마다 업데이트되는 경우 zip연산자를 사용하여 압축하면 400ms마다 출력되더라도 동일한 결과가 나온다.

 

코드

val nums = (1..3).asFlow().onEach { delay(300) } // numbers 1..3 every 300 ms
val strs = flowOf("one", "two", "three").onEach { delay(400) } // strings every 400 ms
val startTime = System.currentTimeMillis() // remember the start time 
nums.**zip**(strs) { a, b -> "$a -> $b" } // compose a single string with "zip"
    .collect { value -> // collect and print 
        println("$value at ${System.currentTimeMillis() - startTime} ms from start") 
    }

실행 결과

1 -> one at 436 ms from start
2 -> two at 836 ms from start
3 -> three at 1240 ms from start

하지만 combine을 사용한다면 어떻게 될까?

 

코드

val nums = (1..3).asFlow().onEach { delay(300) } // numbers 1..3 every 300 ms
val strs = flowOf("one", "two", "three").onEach { delay(400) } // strings every 400 ms          
val startTime = System.currentTimeMillis() // remember the start time 
nums.**combine**(strs) { a, b -> "$a -> $b" } // compose a single string with "combine"
    .collect { value -> // collect and print 
        println("$value at ${System.currentTimeMillis() - startTime} ms from start") 
    }

실행 결과

1 -> one at 446 ms from start
2 -> one at 649 ms from start
2 -> two at 848 ms from start
3 -> two at 950 ms from start
3 -> three at 1249 ms from start

nums나 strs flow에서 각 emission이 일어날 때마다 가장 최근의 값으로 출력된다!


Flattening flows

flow는 비동기적으로 수신된 값을 나타내므로 또다른 sequence값에 대한 요청을 trigger하는 상황이 발생하기 쉽다. 예를 들어 떨어진 두 문자열의 flow를 리턴하는 함수를 실행해보자.

fun requestFlow(i: Int): Flow<String> = flow {
    emit("$i: First")
    delay(500) // wait 500 ms
    emit("$i: Second")
}

3개의 정수 flow가 있고, requestFlow를 호출한다면?

(1..3).asFlow().map { requestFlow(it) }

Flow<Flow<String>>형태로 flow가 생성되므로 single flow로 만들기 위해 flattened작업을 해줘야 한다.

이를 flatten하기 위해 flatten, flatMap계열의 연산자가 있다.

flow의 비동기 특성으로 다양한 모드가 필요하므로 여러가지 연산자가 존재한다.

flatMapConcat

flow들의 흐름을 연결하는 것은 flatMapConcat연산자를 이용할 수 있다.

다음 flow를 시작하기 전, 내부 flow가 완료되는 것을 기다린다.

 

코드

fun requestFlow(i: Int): Flow<String> = flow {
    emit("$i: First") 
    delay(500) // wait 500 ms
    emit("$i: Second")    
}

fun main() = runBlocking<Unit> { 
    val startTime = System.currentTimeMillis() // remember the start time 
    (1..3).asFlow().onEach { delay(100) } // emit a number every 100 ms 
        .flatMapConcat { requestFlow(it) }                                                                           
        .collect { value -> // collect and print 
            println("$value at ${System.currentTimeMillis() - startTime} ms from start") 
        } 
}

실행 결과

1: First at 150 ms from start
1: Second at 656 ms from start
2: First at 760 ms from start
2: Second at 1265 ms from start
3: First at 1366 ms from start
3: Second at 1872 ms from start

flatMapMerge

모든 flow를 동시에 수집하고 해당 값을 하나의 flow로 병합하여 값이 가능한 빨리 방출되게 할 수 있다.

flow가 동시에 수집될 때 동시 flow의 개수를 제한하는 concurrency 매개변수를 설정할 수 있다.

 

코드

fun requestFlow(i: Int): Flow<String> = flow {
    emit("$i: First") 
    delay(500) // wait 500 ms
    emit("$i: Second")    
}

fun main() = runBlocking<Unit> { 
    val startTime = System.currentTimeMillis() // remember the start time 
    (1..3).asFlow().onEach { delay(100) } // a number every 100 ms 
        .flatMapMerge { requestFlow(it) }                                                                           
        .collect { value -> // collect and print 
            println("$value at ${System.currentTimeMillis() - startTime} ms from start") 
        } 
}

실행 결과

1: First at 163 ms from start
2: First at 259 ms from start
3: First at 361 ms from start
1: Second at 663 ms from start
2: Second at 760 ms from start
3: Second at 863 ms from start

flatMapMerge{ requestFlow(it) } 에서 flow를 순차적으로 호출하고 그와 동시에 결과 flow를 수집한다.

flatMapLatest

collectLatest와 비슷한 연산자로 새로운 flow가 방출되면 이전의 수집이 취소된다.

 

코드

fun requestFlow(i: Int): Flow<String> = flow {
    emit("$i: First") 
    delay(500) // wait 500 ms
    emit("$i: Second")    
}

fun main() = runBlocking<Unit> { 
    val startTime = System.currentTimeMillis() // remember the start time 
    (1..3).asFlow().onEach { delay(100) } // a number every 100 ms 
        .**flatMapLatest** { requestFlow(it) }                                                                           
        .collect { value -> // collect and print 
            println("$value at ${System.currentTimeMillis() - startTime} ms from start") 
        } 
}

실행 결과

1: First at 180 ms from start
2: First at 304 ms from start
3: First at 406 ms from start
3: Second at 906 ms from start

새로운 값을 받을 때마다 이전 코드를 취소하는 것을 볼 수 있다.


Flow exceptions

flow 수집은 연산자 내부의 emitter나 코드에서 예외가 발생할 때 exception을 throw하고 완료된다.

예외를 처리하는 여러가지 방법을 알아보자.

collector try and catch

collector는 코틀린의 try/catch 블록을 사용해 예외를 처리할 수 있다.

 

코드

fun simple(): Flow<Int> = flow {
    for (i in 1..3) {
        println("Emitting $i")
        emit(i) // emit next value
    }
}

fun main() = runBlocking<Unit> {
    try {
        simple().collect { value ->         
            println(value)
            check(value <= 1) { "Collected $value" }
        }
    } catch (e: Throwable) {
        println("Caught $e")
    } 
}

실행 결과

Emitting 1
1
Emitting 2
2
Caught java.lang.IllegalStateException: Collected 2

이 코드는 collect연산자에서 예외를 성공적으로 catch했으며 그 후에넌 더이상 값이 방출되지 않는다.

Everything is caught

이전 예제는 중간 또는 터미널 연산자에서 발생하는 모든 예외를 자븐다.

예를들어 내보낸 값이 string으로 매핑되도록 코드를 변경해보자.

 

코드

fun simple(): Flow<String> = 
    flow {
        for (i in 1..3) {
            println("Emitting $i")
            emit(i) // emit next value
        }
    }
    .map { value ->
        check(value <= 1) { "Crashed on $value" }                 
        "string $value"
    }

fun main() = runBlocking<Unit> {
    try {
        simple().collect { value -> println(value) }
    } catch (e: Throwable) {
        println("Caught $e")
    } 
}

실행 결과

Emitting 1
string 1
Emitting 2
Caught java.lang.IllegalStateException: Crashed on 2

Exception transparency

emitter의 코드가 예외 처리 동작을 어떻게 캡슐화할 수 있을까?

 

flow는 예외에 투명해야한다. try/catch블록 내부에서 flow빌더의 값을 내보내는 것은 예외 투명성을 위반하는 것이다. try/catch를 사용해서 collector가 예외를 throw하는 것은 항상 catch할 수 있음을 보장한다.

 

emitter는 예외 투명성을 준수하고 예외 처리를 캡슐화할 수 있는 catch 연산자를 사용할 수 있다. catch연산자의 내부 코드는 예외를 분석하고 어떤 예외가 잡혔는지에 따라 다른 방식으로 반응할 수 있다.

  1. 예외를 throw를 사용해서 다시 발생시키기
  2. 예외를 catch내부의 emit을 사용하여 값으로 변환할 수 있음
  3. 예외를 무시하거나 기록하거나 다른코드에서 처리할 수 있음

코드

fun simple(): Flow<String> = 
    flow {
        for (i in 1..3) {
            println("Emitting $i")
            emit(i) // emit next value
        }
    }
    .map { value ->
        check(value <= 1) { "Crashed on $value" }                 
        "string $value"
    }

fun main() = runBlocking<Unit> {
    simple()
        .catch { e -> emit("Caught $e") } // emit on exception
        .collect { value -> println(value) }
}

실행 결과

Emitting 1
string 1
Emitting 2
Caught java.lang.IllegalStateException: Crashed on 2

위 방식을 사용하면 try/catch블록이 없더라도 동일한 결과를 출력할 수 있다.

Transparent catch

catch 중간 연산자는 upstream예외만 잡는다. 즉, catch위에있는 연산자에 대해서는 예외를 발생시키지만 아래는 잡을 수 없다.

 

아래 코드에서 collect { … } 블록은 catch아래에 있으므로 예외를 잡을 수 없다.

 

코드

fun simple(): Flow<Int> = flow {
    for (i in 1..3) {
        println("Emitting $i")
        emit(i)
    }
}

fun main() = runBlocking<Unit> {
    simple()
        .catch { e -> println("Caught $e") } // does not catch downstream exceptions
        .collect { value ->
            check(value <= 1) { "Collected $value" }                 
            println(value) 
        }
}

실행 결과

Emitting 1
1
Emitting 2
Exception in thread "main" java.lang.IllegalStateException: Collected 2
    at ...

catch연산자가 있지만, Catch $e의 결과가 출력되지 않는 것을 볼 수 있다.

Catching declaratively

선언적으로 catch하기

 

catch연산자가 모든 예외를 처리하려는 특성을 이용해 catch연산자 이전에 collect연산자의 내부 코드를 onEach내부로 이동시켜보자.

이런 flow의 수집은 파라미터 없이 collect()호출에 의해 트리거된다.

 

코드

simple()
    .onEach { value ->
        check(value <= 1) { "Collected $value" }                 
        println(value) 
    }
    .catch { e -> println("Caught $e") }
    .collect()

실행 결과

Emitting 1
1
Emitting 2
Caught java.lang.IllegalStateException: Collected 2

이제 try/catch문을 사용하지 않아도 Caught 메세지가 출력되는 것을 볼 수 있다.


Flow completion

flow의 수집이 완료될 때 작업을 수행해야할 수도 있다.

이는 imperative명령형 혹은 declarative선언형으로 실행 할 수 있다.

Imperative finally block

try/catch외에 collector는 collect가 완료 시 finally블록을 사용할 수 있다.

 

코드

fun simple(): Flow<Int> = (1..3).asFlow()

fun main() = runBlocking<Unit> {
    try {
        simple().collect { value -> println(value) }
    } finally {
        println("Done")
    }
}

실행 결과

1
2
3
Done

이 코드는 3가지 숫자를 출력한 뒤, Done을 출력한다.

Declarative handling

선언형 접근법은 flow가 수집 완료 시 발생하는 onCompletion 중간 연산자가 있다.

이전의 예를 onCompletion을 사용해서 다시 작성해보자.

 

코드

simple()
    .onCompletion { println("Done") }
    .collect { value -> println(value) }

실행 결과

1
2
3
Done

마찬가지로 3가지 숫자를 출력한 뒤, Done을 출력한다.

 

onCompletion의 주요 장점은 람다의 nullable Throwable 파라미터가 flow collection이 정상적으로 완료되었는지 확인할 수 있다.

 

코드

fun simple(): Flow<Int> = flow {
    emit(1)
    throw RuntimeException()
}

fun main() = runBlocking<Unit> {
    simple()
        .onCompletion { cause -> if (cause != null) println("Flow completed exceptionally") }
        .catch { cause -> println("Caught exception") }
        .collect { value -> println(value) }
}

실행 결과

1
Flow completed exceptionally
Caught exception

catch와 달리 onCompletion연산자는 예외를 처리하지 않으며 예외는 여전히 downstream으로 흐른다.

이 예외는 onCompletion 연산자에 전달되고 catch 연산자로 처리된다.

Successful completion

onCompletioncatch연산자의 또다른 차이점

onCompletion은 모든 예외를 볼 수 있으며, upstream flow가 성공적으로 완료된 후에는 null 예외를 받는다.

 

코드

fun simple(): Flow<Int> = (1..3).asFlow()

fun main() = runBlocking<Unit> {
    simple()
        .onCompletion { cause -> println("Flow completed with $cause") }
        .collect { value ->
            check(value <= 1) { "Collected $value" }                 
            println(value) 
        }
}

실행 결과

1
Flow completed with java.lang.IllegalStateException: Collected 2
Exception in thread "main" java.lang.IllegalStateException: Collected 2

위 코드의 실행 결과, check에서 예외로 인해 flow가 중단되었기 때문에 완료 원인이 null이 아니라는 것을 볼 수 있다.

 

끝까지 완료된 경우는 어떤 결과가 나오게 될까?

 

코드

fun simple(): Flow<Int> = (1..3).asFlow()

fun main() = runBlocking<Unit> {
    simple()
        .onCompletion { cause -> println("Flow completed with $cause") }
        .collect { value ->
            check(value <= 3) { "Collected $value" }
            println(value)
        }
}

실행 결과

1
2
3
Flow completed with null

flow가 끝까지 실행되었으므로 onCompletion의 결과 null이 출력된다.


Launching flow

flow를 사용해서 어떤 소스로부터 발생하는 비동기 이벤트를 나타내는 것은 쉽다.

들어오는 이벤트에 대한 반응으로 실행 할 코드를 등록하는 것과 계속해서 다른 작업을 수행하는 것이 필요하다.

onEach 연산자가 이 역할을 맡는다.

onEach는 중간 연산자이므로 flow의 collect를 수행하기 위한 터미널 연산자도 필요하다.

onEach 다음에 collect 터미널 연산자를 사용하면 collect되는 것을 기다린 후 이후의 코드가 실행된다.

 

코드

// Imitate a flow of events
fun events(): Flow<Int> = (1..3).asFlow().onEach { delay(100) }

fun main() = runBlocking<Unit> {
    ****events()
        **.onEach** { event -> println("Event: $event") }
        .**collect()** // <--- Collecting the flow waits
    println("Done")
}

실행 결과

Event: 1
Event: 2
Event: 3
Done

즉, 각 이벤트 출력마다 100ms가 소요되고 300ms 이후에 Done이 출력된다.

 

만약 여기서 collect대신 launchIn 터미널 연산자를 사용한다면 flow의 수집을 별도의 코루틴에서 시작할 수 있으므로 다음 코드의 실행을 계속한다.

 

코드

fun main() = runBlocking<Unit> {
    events()
        .onEach { event -> println("Event: $event") }
        .launchIn(this) // <--- Launching the flow in a separate coroutine
    println("Done")
}

실행 결과

Done
Event: 1
Event: 2
Event: 3

이벤트는 100ms마다 출력되고, 그와 함께 flow 이후의 코드인 Done이 출력된다.

 

launchIn 파라미터는 반드시 구체적인 flow를 수집하는 코루틴을 시작 할 CoroutineScope를 지정해야 한다.

위의 예에서는 main함수의 코루틴 스코프인 runBlocking코루틴 빌더에서 나온다.

따라서 flow가 실행되는 동안 runBlocking스코프는 자식 코루틴이 완료를 기다리며 main함수의 리턴을 막는다.

 

launchInJob을 리턴하므로 전체 코루틴의 취소시키지 않고 해당 flow 수집 코루틴만 취소하거나 join할 수 있다.

Flow cancellation checks

편의를 위해 flow빌더는 각 방출된 값의 취소에 대한 추가적인 검사를 할 수 있다.

flow빌더에서 발생하는 루프가 취소 가능하다는 것을 의미한다.

 

코드

fun foo(): Flow<Int> = flow { 
    for (i in 1..5) {
        println("Emitting $i") 
        emit(i) 
    }
}

fun main() = runBlocking<Unit> {
    foo().collect { value -> 
        if (value == 3) **cancel()**  
        println(value)
    } 
}

실행 결과

Emitting 1
1
Emitting 2
2
Emitting 3
3
Emitting 4
Exception in thread "main" kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled; job="coroutine#1":BlockingCoroutine{Cancelled}@6d7b4f4c

숫자 4를 emit할 때 CancellationException이 발생한다.

 

하지만 대부분의 flow연산자는 성능을 이유로 추가적인 취소 확인을 수행하지 않는다.

예를들어 IntRange.asFlow를 사용한 동일한 루프에서 중단하지 않으면 취소 체크를 하지 않는 것을 볼 수 있다.

 

코드

fun main() = runBlocking<Unit> {
    (1..5).asFlow().collect { value -> 
        if (value == 3) cancel()  
        println(value)
    } 
}

실행 결과

1
2
3
4
5
Exception in thread "main" kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled; job="coroutine#1":BlockingCoroutine{Cancelled}@3327bd23

모든 숫자가 수집되고, runBlocking이 리턴되기 전에만 취소 체크를 한다.

Making busy flow cancellable

위의 경우 명시적으로 취소를 확인해야 한다.

onEach { currentCoroutineContext().ensureActive() }를 추가하거나 cancellable()연산자를 사용하면 된다.

 

코드

fun main() = runBlocking<Unit> {
    (1..5).asFlow().cancellable().collect { value -> 
        if (value == 3) cancel()  
        println(value)
    } 
}

실행 결과

1
2
3
Exception in thread "main" kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled; job="coroutine#1":BlockingCoroutine{Cancelled}@5ec0a365

cancellable연산자를 사용한 결과 1부터 3까지만 수집된 후 cancel되었으므로 이를 체크하여 예외 발생 후 이후의 숫자는 출력되지 않는다.

반응형