본문 바로가기

Android

flatMapConcat, flatMapMerge, flatMapLatest 차이

반응형

Flow 관련 로직을 구현하다가 flatMapConcat에서는 동작하지 않던 기능이 flatMapMerge, flatMapLatest로 변경하니 동작하는 경험을 하여 세 메소드의 차이를 알아보았습니다.

Flattening flow: flatMapConcat, flatMapMerge and flatMapLatest

 

Flattening flow: flatMapConcat, flatMapMerge and flatMapLatest

Let's learn how can we flatMap flow with flatMapConcat, flatMapMerge, and flatMapLatest.

kt.academy

flatMap은 콜렉션에서 사용할 수 있는 유용한 기능입니다. 콜렉션에서는 map과 비슷하지만 변환 함수 내에서 콜렉션을 반환하여야 하며, 콜렉션을 flatten 할 수 있습니다. 예를 들어, employ 리스트를 갖고 있는 department가 리스트로 이루어져 있다면, 다음과 같이 직원 목록을 뽑아낼 수 있습니다.

val allEmployees: List<Employee> = departments
    .flatMap { department -> department.employees }

// map을 썼을 경우 이중 리스트를 반환합니다.
val listOfListOfEmployee: List<List<Employee>> = departments
    .map { department -> department.employees }

flow 내에서 flatMap은 어떻게 동작해야 할까요? flatten 된 flow를 반환해야 할 것 같습니다. 문제는 flow의 각 요소는 시간 텀을 두고 존재한다는 것입니다. 두 번째 요소에서 생성된 flow는 첫 번째 요소에서 생성된 flow를 기다려야할까요? 아니면 concurrent 하게 처리해야 할까요? 이러한 의문에 대한 정답은 없기 때문에 flow에는 flatMap 함수 대신 flatMapConcat, flatMapMerge, flatMapLatest가 존재합니다.

flatMapConcat 함수는 생성된 flow를 차례대로 처리합니다. 첫 번째 flow가 완료되어야 두 번째 flow가 시작될 수 있습니다. 예제와 함께 확인해봅시다. "A", "B", "C"로 flow를 생성했습니다. 각 문자로부터 1, 2, 3과 문자가 결합된 flow가 1초 간격으로 생성됩니다.

suspend fun main() {
    flowOf("A", "B", "C")
        .flatMapConcat { flowFrom(it) }
        .collect { println(it) }
}

fun flowFrom(ele: String) = flowOf(1, 2, 3)
    .onEach { delay(1000L) }
    .map { "${it}_${ele}" }
// (1 sec)
// 1_A
// (1 sec)
// 2_A
// (1 sec)
// 3_A
// (1 sec)
// 1_B
// (1 sec)
// 2_B
// (1 sec)
// 3_B
// (1 sec)
// 1_C
// (1 sec)
// 2_C
// (1 sec)
// 3_C

flatMapConcat order

flatMapMerge는 생성된 flow를 동시적으로 처리합니다.

suspend fun main() {
    flowOf("A", "B", "C")
        .flatMapMerge { flowFrom(it) }
        .collect { println(it) }
}

fun flowFrom(ele: String) = flowOf(1, 2, 3)
    .onEach { delay(1000L) }
    .map { "${it}_${ele}" }
// (1 sec)
// 1_A
// 1_B
// 1_C
// (1 sec)
// 2_A
// 2_B
// 2_C
// (1 sec)
// 3_A
// 3_B
// 3_C

flatMapMerge order

동시에 처리 가능한 flow의 수는 concurrency 파라미터를 사용하여 설정할 수 있습니다. 기본 값은 16이며, JVM에서 DEFAULT_CONCURRENCY_PROPERTY_NAME 속성을 변경하여 기본 값을 변경할 수 있습니다. concurrency 설정 값보다 많은 요소를 가진 flow에서 flatMapMerge를 사용할 경우 의도한대로 동작하지 않을 수 있어 주의가 필요합니다.

suspend fun main() {
    flowOf("A", "B", "C")
        .flatMapMerge(concurrency = 2) { flowFrom(it) }
        .collect { println(it) }
}

fun flowFrom(ele: String) = flowOf(1, 2, 3)
    .onEach { delay(1000L) }
    .map { "${it}_${ele}" }
// (1 sec)
// 1_A
// 1_B
// (1 sec)
// 2_A
// 2_B
// (1 sec)
// 3_A
// 3_B
// (1 sec)
// 1_C
// (1 sec)
// 2_C
// (1 sec)
// 3_C

일반적으로 flatMapMerge는 flow의 각 요소를 사용하여 데이터를 요청할 때 사용합니다. 예를 들어, 많은 수의 카테고리가 있고, 각각의 카테고리에 요청이 필요할 때 flatMapMerge를 사용할 수 있습니다. 이러한 요청은 async 함수로도 가능하지만, flatMapMerge를 사용하면 두 가지 이점이 있습니다:

  • concurrency 파라미터를 조정하여 한 번에 몇 개의 카테고리를 요청할 지 제한할 수 있습니다.
  • flow를 반환하고 다음 요소가 도착하는대로 즉시 처리 가능합니다.
suspend fun getOffers(
    categories: List<Category>
): List<Offer> = coroutineScope {
    categories
        .map { async { api.requestOffers(it) } }
        .flatMap { it.await() }
}

// A better solution
suspend fun getOffers(
    categories: List<Category>
): Flow<Offer> = categories
    .asFlow()
    .flatMapMerge(concurrency = 20) {
        suspend { api.requestOffers(it) }.asFlow()
        // or flow { emit(api.requestOffers(it)) }
    }

마지막 함수인 flatMapLatest는 새로운 flow가 도착하면 이전 flow를 잊어버립니다. 예제의 "A", "B", "C" 전달에는 딜레이가 없어 "1_C", "2_C", "3_C"만 보일 것입니다.

suspend fun main() {
    flowOf("A", "B", "C")
        .flatMapLatest { flowFrom(it) }
        .collect { println(it) }
}

fun flowFrom(ele: String) = flowOf(1, 2, 3)
    .onEach { delay(1000L) }
    .map { "${it}_${ele}" }
// (1 sec)
// 1_C
// (1 sec)
// 2_C
// (1 sec)
// 3_C

flatMapLatest order

초기 flow의 요소에 딜레이가 추가되면 더 흥미로워집니다. 아래 예에서 일어나는 일은 (1.2초 후) "A"가 flowFrom을 사용하여 생성된 흐름을 시작한다는 것입니다. 이 flow는 1초 안에 "1_A" 요소를 생성하지만 200ms 후에 "B"가 나타나 이전 flow가 닫히고 잊혀집니다. "B" 흐름은 "C"가 나타났을 때 "1_B"를 생성하고 흐름을 생성하기 시작했습니다. 이후로는 flow가 나타나지 않아 1초 간격으로 "1_C", "2_C" 및 "3_C" 요소가 생성됩니다.

suspend fun main() {
    flowOf("A", "B", "C")
        .onEach { delay(1200L) }
        .flatMapLatest { flowFrom(it) }
        .collect { println(it) }
}

fun flowFrom(ele: String) = flowOf(1, 2, 3)
    .onEach { delay(1000L) }
    .map { "${it}_${ele}" }
// (1.2 sec)
// (1 sec)
// 1_A
// (0.2 sec)
// (1 sec)
// 1_B
// (0.2 sec)
// (1 sec)
// 1_C
// (1 sec)
// 1_C
// (1 sec)
// 2_C
// (1 sec)
// 3_C

flatMapLatest canceled

반응형