시배's Android

Kotlin Coroutines Deep Dive | 20장 플로우의 실제 구현 본문

Book/Kotlin Coroutines Deep Dive

Kotlin Coroutines Deep Dive | 20장 플로우의 실제 구현

si8ae 2024. 3. 28. 21:40

20장 플로우의 실제 구현

플로우의 내부구현에 대해 알아보자.

Flow 이해하기

간단한 람다식

@Test
fun main() = runTest {
    val f: () -> Unit = {
        println("A")
        println("B")
        println("C")
    }

    f()
}

// 출력 겨로가
A
B
C

플로우는 위의 람다식과 별반 다를것이 없다.

아래는 실제 플로우의 내부 구현을 간단하게 구현해본 것이다.

@Test
fun main() = runTest {
    val f: suspend ((String) -> Unit) -> Unit = { emit ->
        emit("A")
        emit("B")
        emit("C")
    }

    f { println(it) }
    f { println(it) }
}

// 출력 결과
A
B
C
A
B
C

내부 구성이 복잡하니까 함수인터페이스로 추출한다.

fun interface FlowCollector {
    suspend fun emit(string: String)
}

@Test
fun main() = runTest {
    val f: suspend (FlowCollector) -> Unit = {
        it.emit("A")
        it.emit("B")
        it.emit("C")
    }

    f { println(it) }
    f { println(it) }
}

위의 식은 it을 쓰기도 귀찮기에 리시버로 FlowCollector를 사용하면 다음과 같다.

@Test
fun main() = runTest {
    val f: suspend FlowCollector.() -> Unit = {
        emit("A")
        emit("B")
        emit("C")
    }

    f { println(it) }
    f { println(it) }
}

어디서 많이 본 형태이지 않은가? 하지만 이렇게 람다식을 전달하는 것 대신에 인터페이스를 구현하는 편이 더 낫다. (실제 플로우도 이렇게 구현되어 있다.)

fun interface FlowCollector {
    suspend fun emit(string: String)
}

interface Flow {
    suspend fun collect(collector: FlowCollector)
}

fun flow(
    builder: suspend FlowCollector.() -> Unit
) = object : Flow {
    override suspend fun collect(collector: FlowCollector) = builder {
        collector.emit(it)
    }
}

@Test
fun main() = runTest {
    val f = flow {
        emit("A")
        emit("B")
        emit("C")
    }

    f.collect { println(it) }
    f.collect { println(it) }
}

위의 형태가 실제 플로우의 구현 방식이라고 보아도 좋다. collect를 호출하면 flow빌더를 호출할 때 넣은 람다식이 실행된다.

다른 빌더들도 위와같은 형태로 flow를 빌드한다.

@FlowPreview
public fun <T> (suspend () -> T).asFlow(): Flow<T> = flow {
    emit(invoke())
}

public fun <T> Iterable<T>.asFlow(): Flow<T> = flow {
    forEach { value ->
        emit(value)
    }
}

public fun <T> Iterator<T>.asFlow(): Flow<T> = flow {
    forEach { value ->
        emit(value)
    }
}

public fun <T> Sequence<T>.asFlow(): Flow<T> = flow {
    forEach { value ->
        emit(value)
    }
}

Flow 처리 방식

플로우는 그냥 람다식에 비해 훨씬 복잡하다고 여겨진다. 하지만 플로우의 강력한 점은 플로우를 생성하고, 처리하고, 감지하기 위해 정의한 함수에서 찾을 수 있다.

@Test
fun main() = runBlocking {

    flowOf("A", "B", "C")
        .map {
            delay(1000L)
            it.lowercase()
        }.collect {
            println(it)
        }
}

// 출력 결과
(1초 후)
a
(1초 후)
b
(1초 후)
c

플로우는 다양한 형태의 확장함수를 처리할 수 있도록 제공하고 있으며 이는 다음과 같이 비슷한 형태로 제공된다.

public fun <T> Flow<T>.onEach(action: suspend (T) -> Unit): Flow<T> = transform { value ->
    action(value)
    return@transform emit(value)
}

public inline fun <T, R> Flow<T>.map(crossinline transform: suspend (value: T) -> R): Flow<R> = transform { value ->
    return@transform emit(transform(value))
}

internal inline fun <T, R> Flow<T>.unsafeTransform(
    @BuilderInference crossinline transform: suspend FlowCollector<R>.(value: T) -> Unit
): Flow<R> = unsafeFlow { // Note: unsafe flow is used here, because unsafeTransform is only for internal use
    collect { value ->
        return@collect transform(value)
    }
}

동기로 작동하는 flow

플로우 또한 중단 함수처럼 동기로 작동하기에 플로우가 완료될 때 까지 collect의 호출이 중단된다.

즉, 하나의 원소가 collect되기 전까지 내부는 동기적으로 작동함 (플로우는 새로운 코루틴을 시작하지 않는다.)

다음 예를 보면 될 듯

    flowOf("A", "B", "C")
        .map {
            delay(1000L)
            it.lowercase()
        }.collect {
            println(it)
        }

플로우와 공유상태

플로우를 활용할 때는 경쟁상태를 생각해야 한다.

하나의 플로우내부에서는 동기적으로 작동하기에 큰 문제가 없지만,

다양한 플로우에서 사용하는 외부 변수는 동기화가 필수이며 플로우 컬렉션이 아니라 플로우에 종속되게 된다.

다음 예를 보자.

fun Flow<Int>.counter(): Flow<Int> {
    var counter = 0
    return this.map {
        counter++
        List(10000) { Random.nextLong() }.shuffled().sorted() // 특정 작업 수행
        counter
    }
}

@Test
fun main() = runTest {
    launch(Dispatchers.Default) {
        val f1 = List(1_000) { it }.asFlow()
        val f2 = List(1_000) { it }.asFlow().counter()

        launch { println(f1.counter().last()) }
        launch { println(f1.counter().last()) }
        launch { println(f2.last()) }
        launch { println(f2.last()) }
    }
}
// 출력 결과
1000
1000
1991 // 2000보다 작은 값
1999 // 2000보다 작은 값

주의 Dispatcher.Default를 통하지 않으면 단일 쓰레드에서 테스트가 돌아가기에 2000이라는 정확한 값이 출력 됨

두개의 코루틴이 병렬로 원소를 세게 되어 f2.last()는 2000을 반환하게 되는 것이 맞지만 2000을 정확하게 반환하지는 않고(변수를 공유하기에) 2000보다 작은 값을 반환하게 됩니다.

따라서 플로우에서 사용하는 변수가 함수 외부, 클래스의 스코프, 최상위 래벨등에 정의되어 있으면 동기화가 필요합니다.

var counter = 0
fun Flow<Int>.counter(): Flow<Int> {
    return this.map {
        counter++
        List(10000) { Random.nextLong() }.shuffled().sorted() // 특정 작업 수행
        counter
    }
}

@Test
fun main() = runTest {
    launch(Dispatchers.Default) {
        val f1 = List(1_000) { it }.asFlow()
        val f2 = List(1_000) { it }.asFlow().counter()

        launch { println(f1.counter().last()) }
        launch { println(f1.counter().last()) }
        launch { println(f2.last()) }
        launch { println(f2.last()) }
    }
}

// 출력 결과
3968 // 4000보다 작은 값들
3968
3982
3998