연속체 전달 스타일
- 호출되는 함수에 연속체를 보내는 것을 전제로 하고 있어, 함수가 완료되는 대로 연속체를 호출할 것이다.
- 모든 일시 중단 연산은 연속체를 보내고 받고록 변환하는데 이러한 대부분의 복잡한 작업은 컴파일러가 수행한다.
- 일시 중단 연산은 상태 머신으로 변환되는데, 상태를 저장하고 복구하며 한 번에 코드의 한 부분을 실행한다.
연속체
public interface Continuation<in T> {
public val context : CoroutineContext
public fun resume(value : T)
public fun resumeWithException(exception : Throwable)
}
- CoroutineContext는 Continuation과 사용된다.
- resume() 함수는 T 값을 파라미터로 갖는다. 이 값은 일시 중단을 일으킨 작업의 결과다. 따라서 해당 함수가 Int를 반환하는 함수를 호출하기 위해 일시 중지되면, T 값은 정수가 된다.
- resumeWithException() 함수는 예외의 전파를 허용한다.
suspend 한정자
- 코틀린 팀의 구체적인 목표 중 하나는 동시성을 지원하기 위해 가능한 언어 변화를 작게 가져가는 것이었다.
- 코루틴 및 동시성의 지원에 따른 영향은 컴파일러, 표준 라이브러리, 코루틴 라이브러리에서 취하도록 했다.
- suspend 한정자는 주어진 범위의 코드가 연속체를 사용해 동작하도록 컴파일러에게 지시한다.
suspend fun getUserSummary(id : Int) : UserSummary {
logger.log("fetching summary of $id")
val profile = fetchProfile(id)
val age = calculateAge(profile.dateOfBirth)
val terms = validateTerms(profile.country, age)
return UserSummary(profile, age, terms)
}
연속체
- 다른 지점에서 실행을 재개할 수 있는 기본 함수
- 동일한 함수에서 간단히 어떤 호출을 콜백으로 리다이렉트하는 것이다.
- 재개하려면 최소한 라벨이 있어야 한다.
suspend fun getUserSummary(id : Int, cont : Continuation<Any?>) : UserSummary {
val sm = object : CoroutineImpl {
override fun doResume(data : Any?, exception : Throwable?) {
getUserSummary(id, this)
}
}
val state = sm as CoroutineImpl
when(state.label) {
...
}
}
콜백
- getUserSummary()로부터 호출된 다른 일시 중단 함수가 CoroutineImpl을 전달받도록 수정해야 한다.
when(state.label) {
0 -> {
logger.log("fetching summary of $id")
fetchProfile(id, sm)
return
}
1 -> {
calculateAge(profile.dateOfBirth)
validateTerms(profile.country, age, sm)
return
}
2 -> {
UserSummary(profile, age, terms)
}
}
라벨 증분
when(state.label) {
0 -> {
logger.log("fetching summary of $id")
sm.label = 1
fetchProfile(id, sm)
return
}
1 -> {
calculateAge(profile.dateOfBirth)
sm.label = 2
validateTerms(profile.country, age, sm)
return
}
2 -> {
UserSummary(profile, age, terms)
}
}
다른 연산으로부터의 결과 저장
private class GetUserSummarySM : CoroutineImpl {
var value : Any? = null
var exception : Throwable? = null
var cont : Continuation<Any?>? = null
val id : Int? = null
var profile : Profile? = null
var age : Int? = null
var terms : Terms? = null
override fun doResume(data : Any?, exception : Throwable?) {
this.value = data
this.exception = exception
getUserSummary(id, this)
}
}
val sm = cont as? GetUserSummarySM ?: GetUserSummarySM()
when(sm.label) {
0 -> {
sm.cont = cont
logger.log("fetching summary of $id")
sm.label = 1
fetchProfile(id, sm)
return
}
1 -> {
sm.profile = sm.value as Profile
sm.age = calculateAge(sm.profile!!.dateOfBirth)
sm.label = 2
validateTerms(sm.profile!!.country, sm.age!!, sm)
return
}
2 -> {
sm.terms = sm.value as Terms
UserSummary(sm.profile!!, sm.age!!, sm.terms!!)
}
}
일시 중단 연산의 결과 반환
suspend fun getUserSummary(id : Int, cont : Continuation<Any?>) {
val sm = cont as? GetUserSummarySM ?: GetUserSummarySM()
when(sm.label) {
0 -> {
sm.cont = cont
logger.log("fetching summary of $id")
sm.label = 1
fetchProfile(id, sm)
return
}
1 -> {
sm.profile = sm.value as Profile
sm.age = calculateAge(sm.profile!!.dateOfBirth)
sm.label = 2
validateTerms(sm.profile!!.country, sm.age!!, sm)
return
}
2 -> {
sm.terms = sm.value as Terms
sm.cont!!.resume(UserSummary(sm.profile!!, sm.age!!, sm.terms!!))
}
}
}
ContinuationInterceptor
- CoroutineContext는 고유한 키와 함께 저장되는 서로 다른 CoroutineContext.Element를 가지는 맵처럼 동작한다.
public interface ContinuationInterceptor : CoroutineContext.Element {
companion object Key : CoroutineContext.Key<ContinuationInterceptor>
fun <T> interceptContinuation(cont : Continuation<T>) : Continuation<T>
}
CoroutineDispatcher
- CommonPool, Unconfined 및 DefaultDispatcher와 같이 제공된 모든 디스패처의 구현을 위해 사용되는 ContinuationInterceptor의 추상 구현체이다.
public abstract class CoroutineDispatcher : AbstractCoroutineContextElement(ContinuationInterCeptor), ContinuationInterceptor {
open fun isDispatchNeeded(context : CoroutineContext) : Boolean = true
abstract fun dispatch(context : CoroutineContext, block : Runnable)
override fun <T> interceptContinuation(continuation : Continuation<T>) : Continuation<T> = DispatchedContinuation(this, continuation)
public operator fun plus(other : CoroutineDispatcher) = other
override fun toString() : String = "$classSimpleName@$hexAddress"
스레드 전환 정리
- 초기 Continuation은 DispatchedContinuation으로 감싸여 있다.
- 여전히 Continuation이지만 Unconfined를 제외한 경우에, 필요에 따라 CoroutineDispatcher로 전달할 수 있다.
- CoroutineDispatcher는 요구사항에 적합한 어떤 실행자든지 사용하게 되며 DispatchedTask를 전송한다.
- DispatchedTask는 Runnable로서, withCoroutineContext()를 사용하는 적절한 컨텍스트를 설정하고, DispatchedContinuation으로부터 resume()와 resumeWithException() 함수를 호출한다.
- 따라서 실제 스레드 변경은 CoroutineDispatcher에서 일어나지만, 실행 전에 연속체를 가로챌 수 있는 전체 파이프라인 덕분에 가능한 것이다.