시배's Android

Kotlin 동시성 프로그래밍 | 9장. 코틀린의 동시성 내부 본문

Book/Kotlin 동시성 프로그래밍

Kotlin 동시성 프로그래밍 | 9장. 코틀린의 동시성 내부

si8ae 2023. 8. 6. 21:46

연속체 전달 스타일

  • 호출되는 함수에 연속체를 보내는 것을 전제로 하고 있어, 함수가 완료되는 대로 연속체를 호출할 것이다.
  • 모든 일시 중단 연산은 연속체를 보내고 받고록 변환하는데 이러한 대부분의 복잡한 작업은 컴파일러가 수행한다.
  • 일시 중단 연산은 상태 머신으로 변환되는데, 상태를 저장하고 복구하며 한 번에 코드의 한 부분을 실행한다.

연속체

public interface Continuation<in T> {
	public val context : CoroutineContext
    public fun resume(value : T)
    public fun resumeWithException(exception : Throwable)
}
  • CoroutineContext는 Continuation과 사용된다.
  • resume() 함수는 T 값을 파라미터로 갖는다. 이 값은 일시 중단을 일으킨 작업의 결과다. 따라서 해당 함수가 Int를 반환하는 함수를 호출하기 위해 일시 중지되면, T 값은 정수가 된다.
  • resumeWithException() 함수는 예외의 전파를 허용한다.

suspend 한정자

  • 코틀린 팀의 구체적인 목표 중 하나는 동시성을 지원하기 위해 가능한 언어 변화를 작게 가져가는 것이었다.
  • 코루틴 및 동시성의 지원에 따른 영향은 컴파일러, 표준 라이브러리, 코루틴 라이브러리에서 취하도록 했다.
  • suspend 한정자는 주어진 범위의 코드가 연속체를 사용해 동작하도록 컴파일러에게 지시한다.
suspend fun getUserSummary(id : Int) : UserSummary {
	// label 0 -> 첫 번째 실행
    logger.log("fetching summary of $id")
    val profile = fetchProfile(id)
    // label 1 -> 다시 시작
    val age = calculateAge(profile.dateOfBirth)
    val terms = validateTerms(profile.country, age)
    // label 2 -> 다시 시작
    return UserSummary(profile, age, terms)
}

연속체

  • 다른 지점에서 실행을 재개할 수 있는 기본 함수
  • 동일한 함수에서 간단히 어떤 호출을 콜백으로 리다이렉트하는 것이다.
  • 재개하려면 최소한 라벨이 있어야 한다.
suspend fun getUserSummary(id : Int, cont : Continuation<Any?>) : UserSummary {
	val sm = object : CoroutineImpl {
    	override fun doResume(data : Any?, exception : Throwable?) {
        	getUserSummary(id, this)
        }
    }
    
    val state = sm as CoroutineImpl
    when(state.label) {
    	...
    }
}

콜백

  • getUserSummary()로부터 호출된 다른 일시 중단 함수가 CoroutineImpl을 전달받도록 수정해야 한다.
when(state.label) {
	0 -> {
    	logger.log("fetching summary of $id")
        fetchProfile(id, sm)
        return
   	}
    1 -> {
    	calculateAge(profile.dateOfBirth)
        validateTerms(profile.country, age, sm)
        return
    }
    2 -> {
    	UserSummary(profile, age, terms)
    }
}

라벨 증분

when(state.label) {
	0 -> {
    	logger.log("fetching summary of $id")
        sm.label = 1
        fetchProfile(id, sm)
        return
   	}
    1 -> {
    	calculateAge(profile.dateOfBirth)
        sm.label = 2
        validateTerms(profile.country, age, sm)
        return
    }
    2 -> {
    	UserSummary(profile, age, terms)
    }
}

다른 연산으로부터의 결과 저장

private class GetUserSummarySM : CoroutineImpl {
	var value : Any? = null
    var exception : Throwable? = null
    var cont : Continuation<Any?>? = null
    val id : Int? = null
    
    var profile : Profile? = null 
    var age : Int? = null
    var terms : Terms? = null
    
    override fun doResume(data : Any?, exception : Throwable?) {
    	this.value = data
        this.exception = exception
        getUserSummary(id, this)
    }
}
val sm = cont as? GetUserSummarySM ?: GetUserSummarySM()

when(sm.label) {
	0 -> {
    	sm.cont = cont
        logger.log("fetching summary of $id")
        sm.label = 1
        fetchProfile(id, sm)
        return
    }
    1 -> {
    	sm.profile = sm.value as Profile
        sm.age = calculateAge(sm.profile!!.dateOfBirth)
        sm.label = 2
        validateTerms(sm.profile!!.country, sm.age!!, sm)
        return
    }
    2 -> {
    	sm.terms = sm.value as Terms
        UserSummary(sm.profile!!, sm.age!!, sm.terms!!)
    }
}

일시 중단 연산의 결과 반환

suspend fun getUserSummary(id : Int, cont : Continuation<Any?>) {
	val sm = cont as? GetUserSummarySM ?: GetUserSummarySM()

	when(sm.label) {
		0 -> { // label 0 -> 첫 번째 실행
   	 		sm.cont = cont
        	logger.log("fetching summary of $id")
        	sm.label = 1
        	fetchProfile(id, sm)
        	return
    	}
    	1 -> { // label 1 -> 다시 시작
    		sm.profile = sm.value as Profile
        	sm.age = calculateAge(sm.profile!!.dateOfBirth)
       		sm.label = 2
        	validateTerms(sm.profile!!.country, sm.age!!, sm)
        	return
    	}
    	2 -> { // label 2 -> 다시 시작 및 종료
    		sm.terms = sm.value as Terms
            sm.cont!!.resume(UserSummary(sm.profile!!, sm.age!!, sm.terms!!))
    	}
	}
}

ContinuationInterceptor

  • CoroutineContext는 고유한 키와 함께 저장되는 서로 다른 CoroutineContext.Element를 가지는 맵처럼 동작한다.
public interface ContinuationInterceptor : CoroutineContext.Element {
	companion object Key : CoroutineContext.Key<ContinuationInterceptor>
    fun <T> interceptContinuation(cont : Continuation<T>) : Continuation<T>
}

CoroutineDispatcher

  • CommonPool, Unconfined 및 DefaultDispatcher와 같이 제공된 모든 디스패처의 구현을 위해 사용되는 ContinuationInterceptor의 추상 구현체이다.
public abstract class CoroutineDispatcher : AbstractCoroutineContextElement(ContinuationInterCeptor), ContinuationInterceptor {
	open fun isDispatchNeeded(context : CoroutineContext) : Boolean = true
    abstract fun dispatch(context : CoroutineContext, block : Runnable)
    override fun <T> interceptContinuation(continuation : Continuation<T>) : Continuation<T> = DispatchedContinuation(this, continuation)
    public operator fun plus(other : CoroutineDispatcher) = other
    override fun toString() : String = "$classSimpleName@$hexAddress"

스레드 전환 정리

  • 초기 Continuation은 DispatchedContinuation으로 감싸여 있다.
  • 여전히 Continuation이지만 Unconfined를 제외한 경우에, 필요에 따라 CoroutineDispatcher로 전달할 수 있다.
  • CoroutineDispatcher는 요구사항에 적합한 어떤 실행자든지 사용하게 되며 DispatchedTask를 전송한다.
  • DispatchedTask는 Runnable로서, withCoroutineContext()를 사용하는 적절한 컨텍스트를 설정하고, DispatchedContinuation으로부터 resume()와 resumeWithException() 함수를 호출한다.
  • 따라서 실제 스레드 변경은 CoroutineDispatcher에서 일어나지만, 실행 전에 연속체를 가로챌 수 있는 전체 파이프라인 덕분에 가능한 것이다.