본문 바로가기
Android/Coroutine

[Kotlin/coroutine] 5. Asynchronous Flow (1)

by 겸 2023. 10. 25.

중단 함수는 비동기적으로 단일 값을 리턴한다. 하지만 우리는 비동기적으로 계산된 여러개의 값을 어떻게 리턴할 수 있을까? 그래서 Flow가 등장하게 되었다!

Representing multiple values

여러개의 값들은 Kotlin에서 collections를 사용해서 나타낼 수 있다.

예를 들어, 3개의 숫자는 리스트로 리턴하고, forEach를 사용해서 출력할 수 있다.

 

코드

fun simple(): List<Int> = listOf(1, 2, 3)

fun main() {
    simple().forEach { value -> println(value) } 
}

실행 결과

1
2
3

Sequences

CPU를 사용해서 각 계산에 100ms정도가 소요되는 blocking code에서 계산하는 경우 Sequence를 통해 값을 나타낼 수 있다.

 

코드

fun simple(): Sequence<Int> = sequence { // sequence builder
    for (i in 1..3) {
        Thread.sleep(100) // pretend we are computing it
        yield(i) // yield next value
    }
}

fun main() {
    simple().forEach { value -> println(value) } 
}

실행 결과

1
2
3

위의 예시와 동일한 숫자를 출력하지만 각 숫자를 출력할 때마다 100ms를 기다린다.

Suspending functions

하지만 이 계산은 코드를 실행하는 메인스레드를 차단한다.

이 값들이 비동기적으로 계산될 때 suspend 수정자를 표시하면 차단 없이 작업을 수행하고 결과를 리스트로 리턴할 수 있다.

 

코드

suspend fun simple(): List<Int> {
    delay(1000) // pretend we are doing something asynchronous here
    return listOf(1, 2, 3)
}

fun main() = runBlocking<Unit> {
    simple().forEach { value -> println(value) } 
}

실행 결과

1
2
3

1초를 기다리고 모든 숫자를 출력한다.


Flows

리턴타입으로 List<Int>를 사용하면 모든 값을 한번에 리턴하는 것을 의미한다.

하지만 비동기적으로 계산된 값을 나타내기 위해 Flow<Int> 타입을 사용할 수있다.

이는 동기적으로 계산된 값을 위한 Sequence<Int>처럼 사용할 수 있다.

 

코드

fun simple(): Flow<Int> = flow { // flow builder
    for (i in 1..3) {
        delay(100) // pretend we are doing something useful here
        emit(i) // emit next value
    }
}

fun main() = runBlocking<Unit> {
    // Launch a concurrent coroutine to check if the main thread is blocked
    launch {
        for (k in 1..3) {
            println("I'm not blocked $k")
            delay(100)
        }
    }
    // Collect the flow
    simple().collect { value -> println(value) } 
}

실행 결과

I'm not blocked 1
1
I'm not blocked 2
2
I'm not blocked 3
3

위 코드는 메인 스레드를 차단하지 않고 각 숫자를 출력하기 전 100ms를 기다린다.

메인 스레드에서 실행되는 별도의 코루틴마다 I'm not blocked이 출력되는 것으로 확인할 수 있다.

특징

  1. Flow타입의 빌더함수를 flow라고 한다.
  2. flow { … } 빌더 블록 내부의 코드는 중단될 수 있다.
  3. simple()함수는 suspend 수정자를 붙이지 않는다.
  4. emit함수를 사용해서 flow에서 값이 emitted된다.
  5. collect함수를 사용해서 flow에서 값이 collected된다.

Flows are cold

flow는 sequences와 비슷하게 cold흐름이다.

flow builder내부의 코드는 flow가 collected되기 전까지 실행되지 않는다.

 

코드

fun simple(): Flow<Int> = flow { 
    println("Flow started")
    for (i in 1..3) {
        delay(100)
        emit(i)
    }
}

fun main() = runBlocking<Unit> {
    println("Calling simple function...")
    val flow = simple()
    println("Calling collect...")
    flow.collect { value -> println(value) } 
    println("Calling collect again...")
    flow.collect { value -> println(value) } 
}

실행 결과

Calling simple function...
Calling collect...
Flow started
1
2
3
Calling collect again...
Flow started
1
2
3

이것이 simple() 함수에 suspend 수정자를 붙이지 않는 이유이다!

simple()을 호출 할 때 자기자신을 빠르게 리턴하며 아무것도 기다리지 않는다.

flow는 수집될 때마다 새로 시작하므로 collect를 호출 할 때마다 다시 Flow Start를 출력하게 된다.


Flow cancellation basics

flow는 코루틴이 일반적으로 협력적으로 취소되는 것에 따른다.

즉, flow collection은 flow가 취소가능한 중단 함수에서 중단 될 때 취소된다.

 

코드

fun simple(): Flow<Int> = flow { 
    for (i in 1..3) {
        delay(100)          
        println("Emitting $i")
        emit(i)
    }
}

fun main() = runBlocking<Unit> {
    withTimeoutOrNull(250) { // Timeout after 250ms 
        simple().collect { value -> println(value) } 
    }
    println("Done")
}

실행 결과

Emitting 1
1
Emitting 2
2
Done

withTimeoutOrNull블록에서 실행될 때 시간 초과로 인해 코드의 실행이 취소되고 flow가 취소된 것이다.


Flow builders

flow { … } 빌더는 가장 기본적인 빌더이다.

flow를 선언할 수 있는 다른 빌더를 알아보자!

flowOf

고정된 값을 방출하는 flow를 정의하는 빌더이다.

flowOf(1, 2, 3)

.asFlow()

collections와 sequnces에 .asFlow()를 사용하면 flow로 변환할 수 있다.

(1..3).asFlow().collect { value -> println(value) }

Intermediate flow operators

flow의 중간 연산자

 

flow는 중간 연산자를 사용해서 collection과 sequence처럼 변환할 수 있다.

중간 연산자는 upstream flow에 적용되고 downstream flow를 리턴한다.

이 연산자는 cold이고, 중단 함수가 아니다.

빠르게 동작하여 새로 변환된 흐름의 정의를 반환한다.

 

대표적으로 mapfilter가 있다.

sequence와의 차이점은 연산자 내부의 코드 블록이 중단 함수를 호출할 수 있다는 것이다.

예를 들어, 들어오는 요청의 flow는 중단 함수에 의해 오랜 시간동안 실행되는 작업을 수행할 때도 map연산자의 결과로 매핑될 수 있다.

 

코드

suspend fun performRequest(request: Int): String {
    delay(1000) // imitate long-running asynchronous work
    return "response $request"
}

fun main() = runBlocking<Unit> {
    (1..3).asFlow() // a flow of requests
        .map { request -> performRequest(request) }
        .collect { response -> println(response) }
}

실행 결과

response 1
response 2
response 3

Transform operator

flow변환 연산자 중에 가장 일반적인 것은 transform이라 불린다.

이는 mapfilter처럼 더 복잡한 변환을 구현하는데 사용할 수 있다.

transform 연산자를 사용해서 임의의 값을 임의의 횟수만큼 내보낼 수 있다.

 

예를들어 transform을 사용하면 장기간 실행되는 비동기요청을 수행하기 전에 문자열을 내보내고 응답을 보낼 수 있다.

코드

(1..3).asFlow() // a flow of requests
    .transform { request ->
        emit("Making request $request") 
        emit(performRequest(request)) 
    }
    .collect { response -> println(response) }

실행 결과

Making request 1
response 1
Making request 2
response 2
Making request 3
response 3

Size-limiting operators

take와 같은 크기 제한 중간 연산자는 제한에 도달하면 flow의 실행을 취소한다.

코루틴에서의 취소는 항상 exception을 던지면서 수행되므로 모든 리소스 관리 함수(try { … } finally { … })가 정상적으로 동작한다.

 

코드

fun numbers(): Flow<Int> = flow {
    try {                          
        emit(1)
        emit(2) 
        println("This line will not execute")
        emit(3)    
    } finally {
        println("Finally in numbers")
    }
}

fun main() = runBlocking<Unit> {
    numbers() 
        .take(2) // take only the first two
        .collect { value -> println(value) }
}

실행 결과

1
2
Finally in numbers

위 코드의 실행 결과 flow 내부에서 두번째 숫자를 내보낸 후 중지된 것을 보여준다.


Terminal flow operators

flow에서 터미널 연산자는 flow의 collection의 시작을 중단하는 함수이다.

collection 연산자가 가장 기본적인 연산자이지만, 더 쉽게 만들 수 있는 다른 터미널 연산자도 있다.

  1. toList, toSet
    1. 다양한 컬렉션으로 변환
  2. first()
    1. flow의 첫번째 값을 가져오기
  3. single()
    1. flow가 하나의 값을 내보내는 것을 보장
  4. reduce(), fold()
    1. flow의 값을 줄이기

코드

val sum = (1..5).asFlow()
    .map { it * it } // squares of numbers from 1 to 5                           
    .reduce { a, b -> a + b } // sum them (terminal operator)
println(sum)

실행 결과

55


Flows are sequential

여러 flow에서 동작하는 특별한 연산자를 쓰지 않는다면 각 flow의 collection은 순차적으로 수행된다.

collection은 터미널 연산자를 호출하는 코루틴에서 직접 작동한다. 기본적으로 새로운 코루틴이 실행되지 않는다!

emitted된 값들 각각은 upstrea에서 downstream으로 모든 중간 연산자에의해 처리된 후 최종 연산자에게 전달된다.

 

짝수를 필터링하고 이를 문자열에 매핑하는 예제이다.

 

코드

(1..5).asFlow()
    .filter {
        println("Filter $it")
        it % 2 == 0              
    }              
    .map { 
        println("Map $it")
        "string $it"
    }.collect { 
        println("Collect $it")
    }

실행 결과

Filter 1
Filter 2
Map 2
Collect string 2
Filter 3
Filter 4
Map 4
Collect string 4
Filter 5

Flow context

flow의 collection은 항상 호출하는 코루틴의 context에서 발생한다.

예를들어, 만약 simple이라는 flow가 있을 때 다음의 코드는 flow의 구현 정보와 관계 없이 이 코드 작성자가 지정한 context에서 실행된다.

withContext(context) {
    simple().collect { value ->
        println(value) // run in the specified context
    }
}

이런 flow의 특성을 context preservation이라고 한다. (context 보존!)

 

따라서 기본적으로 flow빌더에서 코드는 flow의 collector에 의해 제공된 컨텍스트에서 실행된다.

예를들어 simple함수의 실행을 고려할 때, 호출된 스레드를 출력하고 3개의 숫자를 내보내는 코드를 보자.

 

코드

fun simple(): Flow<Int> = flow {
    log("Started simple flow")
    for (i in 1..3) {
        emit(i)
    }
}  

fun main() = runBlocking<Unit> {
    simple().collect { value -> log("Collected $value") } 
}

실행 결과

[main @coroutine#1] Started simple flow
[main @coroutine#1] Collected 1
[main @coroutine#1] Collected 2
[main @coroutine#1] Collected 3

simple().collect는 메인 스레드에 의해 호출되고, simple flow의 내부 코드도 메인 스레드에 의해 호출된다.

빠르게 실행되는 비동기 코드에 대한 기본 값이며 실행 컨텍스트에 신경쓰지 않고, 호출한 코드를 블록하지 않는다.

A common pitfall when using withContext

하지만 오래 실행되는 CPU를 소비하는 코드는 Dispatchers.Default 컨텍스트에서 실행되어야 하거나 UI 업데이트 코드는 Dispatchers.Main 컨텍스트에서 실행되어야 할 수도 있다.

보통 withContext는 코틀린 코루틴을 사용해서 코드에서 컨텍스트를 변경하는데 사용되지만 flow빌더의 코드는 컨텍스트 보존 특성을 준수해야하고 다른 다른 컨텍스트로부터 emit하는 것이 허용되지 않는다.

 

코드

fun simple(): Flow<Int> = flow {
    // The WRONG way to change context for CPU-consuming code in flow builder
    kotlinx.coroutines.withContext(Dispatchers.Default) {
        for (i in 1..3) {
            Thread.sleep(100) // pretend we are computing it in CPU-consuming way
            emit(i) // emit next value
        }
    }
}

fun main() = runBlocking<Unit> {
    simple().collect { value -> println(value) } 
}

위 코드는 잘못된 코드이다. 실행 시 에러가 발생한다. flow내에서 withContext로 내보내는 것이 허용되지 않는다.

다른 context로 내보내는 방법을 알아보자.

flowOn operator

flowOn함수를 사용하면 flow 방출 컨텍스트를 변경할 수 있다.

flow의 컨텍스트 변경을 위한 올바른 방법을 나타낸 코드이다.

 

코드

fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")

fun simple(): Flow<Int> = flow {
    for (i in 1..3) {
        Thread.sleep(100) // pretend we are computing it in CPU-consuming way
        log("Emitting $i")
        emit(i) // emit next value
    }
}.flowOn(Dispatchers.Default) // RIGHT way to change context for CPU-consuming code in flow builder

fun main() = runBlocking<Unit> {
    simple().collect { value ->
        log("Collected $value") 
    } 
}

실행 결과

[DefaultDispatcher-worker-1 @coroutine#2] Emitting 1
[main @coroutine#1] Collected 1
[DefaultDispatcher-worker-1 @coroutine#2] Emitting 2
[main @coroutine#1] Collected 2
[DefaultDispatcher-worker-1 @coroutine#2] Emitting 3
[main @coroutine#1] Collected 3

메인 스레드에서 collection이 발생할 때, 백그라운드 스레드에서 flow가 어떻게 동작하는지 확인해보자.

flowOn은 flow의 기본적인 특성을 변경한 것을 관찰할 수 있다. emission은 coroutine#2에서 일어나고, collection은 coroutine#1에서 실행된다. flowOn연산자는 컨텍스트에서 Coroutine Dispatcher를 변경해야 할 경우 업스트림 플로우에 대해 다른 코루틴을 만든다.


Buffering

서로 다른 코루틴의 flow에서 다른 부분을 실행하는 것은 장기 실행 비동기 작업이 포함된 경우 flow를 collect하기 위해 걸리는 전체 시간의 관점에서 유용하다.

예를들어, simple flow에 의한 방출이 느린 경우 수집 또한 느리다.

 

코드

fun simple(): Flow<Int> = flow {
    for (i in 1..3) {
        delay(100) // pretend we are asynchronously waiting 100 ms
        emit(i) // emit next value
    }
}

fun main() = runBlocking<Unit> { 
    val time = measureTimeMillis {
        simple().collect { value -> 
            delay(300) // pretend we are processing it for 300 ms
            println(value) 
        } 
    }   
    println("Collected in $time ms")
}

실행 결과

1
2
3
Collected in 1226 ms

각 출력마다 400ms가 소요된다.

 

하지만 buffer 연산자를 사용한다면 simple의 방출하는 코드와 수집하는 코드를 동시에 실행한다.

 

코드

fun simple(): Flow<Int> = flow {
    for (i in 1..3) {
        delay(100) // pretend we are asynchronously waiting 100 ms
        emit(i) // emit next value
    }
}

fun main() = runBlocking<Unit> { 
    val time = measureTimeMillis {
        simple()
            **.buffer()** // buffer emissions, don't wait
            .collect { value -> 
                delay(300) // pretend we are processing it for 300 ms
                println(value) 
            } 
    }   
    println("Collected in $time ms")
}

실행 결과

1
2
3
Collected in 1076 ms

첫 번째 숫자에 대해 100ms만 기다린 후, 다음 숫자를 처리하는데 각각 300ms만 기다리면 된다.

100 + 300 + 300 + 300 으로 약 1000ms가 소요된다.

Conflation

flow가 작업 또는 작업 상태의 업데이트 결과를 나타내는 경우, 각각의 값을 처리할 필요가 없고 가장 최근의 값만 처리할 수 있다.

 

conflate연산자는 collector가 작업을 처리하는 것이 너무 느릴 때, 중간 값을 스킵할 수 있다.

 

코드

fun simple(): Flow<Int> = flow {
    for (i in 1..3) {
        delay(100) // pretend we are asynchronously waiting 100 ms
        emit(i) // emit next value
    }
}

fun main() = runBlocking<Unit> { 
    val time = measureTimeMillis {
        simple()
            .conflate() // conflate emissions, don't process each one
            .collect { value -> 
                delay(300) // pretend we are processing it for 300 ms
                println(value) 
            } 
    }   
    println("Collected in $time ms")
}

실행 결과

1
3
Collected in 754 ms

첫번째 숫자가 처리되는 동안 2, 3번째 숫자가 이미 생성되었으므로 2번째 숫자가 conflated되고 가장 최근 숫자인 3만 collector에 전달되었다.

Processing the latest value

conflation은 emitter와 collector모두 느릴 때 빠른 처리를 위해 사용할 수 있다. 방출된 값을 drop하며 이를 수행한다.

또 다른 방법으로 새로운 값이 방출될 때마다 느린 콜렉터를 취소하고 재시작하는 것이 있다.

xxxLatest연산자는 xxx연산자와 비슷한 로직으로 동작하지만 새로운 값에 대해 해당 블록을 취소한다.

 

코드

fun simple(): Flow<Int> = flow {
    for (i in 1..3) {
        delay(100) // pretend we are asynchronously waiting 100 ms
        emit(i) // emit next value
    }
}

fun main() = runBlocking<Unit> { 
    val time = measureTimeMillis {
        simple()
            .collectLatest { value -> // cancel & restart on the latest value
                println("Collecting $value") 
                delay(300) // pretend we are processing it for 300 ms
                println("Done $value") 
            } 
    }   
    println("Collected in $time ms")
}

실행 결과

Collecting 1
Collecting 2
Collecting 3
Done 3
Collected in 693 ms

collectLatest은 300ms가 소요되지만 새로운 값이 100ms마다 방출된다.

모든 값에 대해 블록이 실행되지만 마지막 값에 대해서만 완료된다.

(100 + 100 + 100 + 300) 약 600ms 소요

반응형