시배's Android

Kotlin 동시성 프로그래밍 | 7장. 스레드 한정, 액터 그리고 뮤텍스 본문

Book/Kotlin 동시성 프로그래밍

Kotlin 동시성 프로그래밍 | 7장. 스레드 한정, 액터 그리고 뮤텍스

si8ae 2023. 8. 4. 00:02

 원자성 위반

  • 이 유형의 오류는 정확한 동기화 없이 객체의 상태를 동시에 수정할 때 발생한다.
  • 원자성 위반은 코틀린에서도 발생할 수 있지만 오류를 피할 수 있도록 디자인하는 데 도움이 되는 기본형을 제공한다.

원자성의 의미 

  • 연산이 단일하고 분할할 수 없을 때 이 연산을 원자적이라고 한다.
  • 공유 상태에 관해 언급할 때 흔히 많은 스레드에서 하나의 변수를 읽거나 쓰는 것에 대해 이야기한다.
  • 보통 원자적이지 않아서 문제가 발생한다.
  • 공유 상태를 수정하는 코드 블록이 다른 스레드의 변경 시도와 겹치면서 이런 문제가 발생한다.
  • 한 스레드가 현재 값을 바꾸는 중에 아직 쓰지는 않은 상태에서 다른 스레드가 현재 값을 읽을 수 있다.
  • 코드 블록을 원자적으로 만들려면 블록 안에서 발생하는 어떤 메모리 액세스도 동시에 실행되지 않도록 해야 한다.

스레드 한정

  • 공유 상태에 접근하는 모든 코루틴을 단일 스레드에서 실행되도록 한정하는 것을 의미한다.
var counter = 0
val context = newSingleThreadContext("counter")

fun asyncIncrement(by : Int) = async(context) {
	for(i in 0 until by) {
    	counter++
    }
}

액터

  • 상태 액세스를 단일 스레드로 한정하고 다른 스레드가 채널로 상태 수정을 요청할 수 있다.
  • 액터를 사용하면 값을 안전하게 업데이트할 수 있을 뿐만 아니라 이를 위한 강력한 커뮤니케이션 메커니즘도 갖추게 된다.
private var counter = 0
private val context = newSingleThreadContext("counterActor")

fun getCounter() = counter

val actorCounter = actor<Void?>(context) {
	for (msg in channel){
    	counter++
    }
}

fun main() = runBlocking{
	val workerA = asyncIncrement(2000)
    val workerB = asyncIncrement(100)
    
    workerA.await()
    workerB.await()
    print("counter [${getCounter()}]")
}

fun asyncIncrement(by : Int) = async(CommonPool){
	for(i in 0 until by){
    	actorCounter.send(null)
    }
}
  • 액터를 사용하면 채널이 전체 코루틴을 원자적으로 유지하면서 더 높은 유연성을 허용한다는 점이 가장 큰 장점이다.
  • 메시지를 사용해 액터의 기능을 확장할 수 있다.
enum class Action {
	INCREASE,
    DECREASE
}

var actorCounter = actor<Action>(context) {
	for (mag in channel) {
    	when(msg){
        	Action.INCREASE -> counter++
            Action.DECREASE -> counter--
        }
    }
}

fun asyncDecrement(by : Int) = async {
	for (i in 0 until by) {
    	actorCounter.send(Action.DECREASE)
    }
}

fun asyncIncrement(by : Int) = async {
	for (i in 0 until by) {
    	actorCounter.send(Action.INCREASE)
    }
}

fun main() = runBlocking {
	val workerA = asyncIncrement(2000)
    val workerB = asyncIncrement(100)
    val workerC = asyncDecrement(1000)
    
    workerA.await()
    workerB.await()
    workerC.await()
    print("counter [${getCounter()}]")
}

버퍼드 액터

  • 기본적으로 모든 액터는 버퍼링 되지 않는다.
  • 메시지가 수신될 때까지 발신자를 send()에서 일시 중단한다.
  • 버퍼링 된 액터를 생성하려면 capacity 매개변수를 빌더에 전달해야 한다.

CoroutineContext를 갖는 액터

  • CoroutineContext를 전달할 수 있다.
  • 액터의 일시 중단 람다는 주어진 컨텍스트에서 실행될 것이다.

CoroutineStart

  • 기본적으로 액터는 생성되는 즉시 시작된다.
  • CoroutineStart를 전달해 필요에 따라 동작을 변경할 수 있다.

상호배제

  • 상호배제란 한 번에 하나의 코루틴만 코드 블록을 실행할 수 있도록 하는 동기화 메커니즘을 말한다.
  • 코틀린 뮤텍스의 가장 중요한 특징은 블록되지 않는다는 점이다.
  • 실행 대기 중인 코루틴은 잠글을 획득하고 코드 블록을 실행할 수 있을 때까지 일시 중단된다.
  • 코루틴은 일시 중단되지만 일시 중단 함수를 사용하지 않고 뮤텍스를 잠글 수 있다.
  • 자바에 익수하다면 뮤텍스를 넌 블로킹 synchronized로 생각할 수 있다.
  • lock() 일시 중단 함수, unlock() 일시 중단 함수 아님
  • isLocked 현재 잠겨있는지
  • tryLock() 잠글 수 있는지 여부를 나타내는 불리언을 반환

휘발성 변수

스레드 캐시

  • JVM에서 각 스레드는 비휘발성 변수의 캐시 된 사본을 가질 수 있다.
  • 이 캐시는 항상 변수의 실제 값과 동기화되지는 않는다.
  • 한 스레드에서 공유 상태를 변경하면 캐시가 업데이트될 때까지 다른 스레드에서는 볼 수 없다.

@Volatile

  • 변수의 변경사항을 다른 스레드에서 즉시 표시하기 위해 사용할 수 있다.
  • 첫 번째 경우 : 다른 스레드가 읽거나 수정하는 동안 스레드의 읽기가 발생할 때 증분 유실된다.
  • 두 번째 경우 : 다른 스레드가 수정한 후 스레드의 읽기가 발생하지만, 스레드의 로컬 캐시가 업데이트되지 않았을 때
  • 휘발성 변수는 상태를 읽을 때 항상 최신 값을 유지함으로써 두 번째 경우에 보호 기능을 제공한다.
  • 하지만 첫 번째 경우는 보장하지 않는다.
  • @Volatile을 사용하기 좋은 경우
    • 변수 값의 변경은 현재 상태에 의존하지 않는다.
    • 휘발성 변수는 다른 변수에 의존하지 않으며, 다른 변수도 휘발성 변수에 의존하지 않는다.

원자적 데이터 구조

  • AtomicInteger
  • AtomicBoolean
  • incrementAndGet()