사작하기 전에 아래에 있는 UserDownloader 클래스를 살펴보자.
이 클래스는 아이디로 사용자를 받아오거나, 이전에 전송받은 모든 사용자를 얻을 수 있다.
이렇게 구현하면 무슨 문제가 있을까?
class UserDownloader(
private val api: NetworkService,
) {
private val users = mutableListOf<User>()
fun downloaded(): List<User> = users.toList()
suspend fun fetchUser(id: Int) {
val newUser = api.fetchUser(id)
users.add(newUser)
}
}
앞의 구현 방식에는 동시사용에 대한 대비가 되어 있지 않다. fetchUser 호출은 user를 변경한다. 이 경우 같은 시간에 해당 함수가 한개의 스레드에서 시작할 경우에만 정상적으로 작동한다. 같은 시간에 두개 이상의 스레드에서 함수가 호출될 수 있으므로 user는 공유 상태에 해당하며 보호될 필요가 있다. 동시에 리스트를 변경하면 충돌이 일어날 수 있기 때문이다. 아래 예제에서 충돌이 일어 날 수 있는 경우를 확인해보자.
class FakeNetworkService : NetworkService {
override suspend fun fetchUser(id: Int): User {
delay(2)
return User("User$id")
}
}
suspend fun main() {
val downloader = UserDownloader(FakeNetworkService())
coroutineScope {
repeat(1_000_000) {
launch {
downloader.fetchUser(it)
}
}
}
print(downloader.downloaded().size) // ~998242
}
같은 객체와 상호작용하는 스레드가 많기 때문에 위 코드는10000000보다 작은 숫자(예를 들면 998242 같은)를 출력하거나 예외를 던지게 된다.
Exception in thread "main"
java.lang.ArrayIndexOutOfBoundsException: 22
at java.util.ArrayList.add(ArrayList.java:463)
앞서 살펴본 문제는 공유 상태를 변경할때 쉽게 만날 수 있다. 좀더 간단한 예를 들면 하나의 정수를 1씩 증가시키는 스레드가 여러개 있는 경우가 있다. 여기서는 Dispatcher.Default를 사용하는 1000개의 코루틴에서 1000번의 연산을 호출하는 massiveRun을 사용한다. 모든 연산이 끝난 뒤 숫자는 1000000 이 되어야 한다. 하지만 동기화 되지 않으면 충돌이 발생하므로 실제 결과는 이보다 더 작다.
var counter = 0
fun main() = runBlocking {
massiveRun {
counter++
}
println(counter) // ~567231
}
suspend fun massiveRun(action: suspend () -> Unit) =
withContext(Dispatchers.Default) {
repeat(1000) {
launch {
repeat(1000) { action() }
}
}
}
결과가 1,000,000이 아니라는 걸 이해하기 위해 두개의 스레드가 똑같은 시간에 같은 수를 1씩 증가시킨다고 가정해보자. 시작값은 0이다.
첫번째 스레드가 현재 값인 0을 받고 난 뒤 프로세서가 두번째 스레드로 옮기기로 결정한다. 두번째 스레드 또한 0을 받고 1로 증가시킨 뒤 변수에 저장한다. 첫번째 스레드로 다시 옮긴다면 이전에 멈췄을 때 사용한 0을 1로 증가시키고 저장한다. 그 결과 변수는 2가 되어야 하지만 실제로는 1이 되어 버린다. 이 때문에 연산 일부가 반영되지 않는 결과가 일어난다.
동기화 블로킹
위와 같은 문제는 자바에서 사용되는 전통적인 도구인 synchronized 블록이나 동기화된 컬렉션을 사용해 해결할 수 있다.
var counter = 0
fun main() = runBlocking {
vallock = Any()
massiveRun {
synchronized(lock) { // 스레드를 블로킹합니다!
counter++
}
}
println("Counter = $counter") // 1000000
}
이 방법은 작동하긴 하지만 몇가지 문제점이 있다. 가장 큰 문제점은 synchronized 블록 내부에서 중단 함수를 사용할 수 없다는 것이다.
두번째는 synchronized 블록에서 코루틴이 자기 차례를 기다릴 때 스레드를 블로킹한다는 것이다. 디스패쳐의 원리를 생각해보면 코루틴이 스레드를 블로킹하는것은 지양해야 한다. 메인 스레드가 블로킹되면 어떻게 될까? 제한된 수의 스레드만 가지고 있다면 어떨까? 왜 스레드와 같은 자원을 낭비해야 할까? 이러한 방법 대신 코루틴에 특화된 방법을 사용해야 한다. 블로킹 없이 중단하거나 충돌을 회피하는 방법을 사용해야 한다. 지금까지 봤던 방식과는 다른, 코루틴에서 사용하는 방식을 보도록 하자.
원자성
자바에서 간단한 경우에 사용할 수 있는 다른 방법이 있다. 자바는 다양한 원자값을 가지고 있다. 원자값을 활용한 연산은 빠르며 '스레드 안전'을 보장 한다. 이러한 연산을 원자성 연산이라 한다. 원자성 연산은 락없이 로우 레벨로 구현되어 효율적이고 사용하기 쉽다. 사용할 수 있는 원자값의 종류는 다양하다. 여기서는 AtomicInteger를 사용하겠다.
private var counter = AtomicInteger()
fun main() = runBlocking {
massiveRun {
counter.incrementAndGet()
}
println(counter.get()) // 1000000
}
원자값은 의도대로 완벽하게 작동하지만 사용성이 제한 되기 때문에 조심해서 다뤄야 한다. 하나의 연산에서 원자성을 가지고 있다고 해서 전체 연산에서 원자성이 보장 되는 것은 아니기 때문이다.
private var counter = AtomicInteger()
fun main() = runBlocking {
massiveRun {
counter.set(counter.get() + 1)
}
println(counter.get()) // ~430467
}
UserDownloader를 안전하게 사용하기 위해서 읽기만 가능한 사용자 리스트를 AtomicReference로 래핑 할 수도 있다. 충돌 없이 값을 갱신하기 위해 getAndUpdate라는 원자성 보장 함수를 사용한다.
class UserDownloader(
private val api: NetworkService
) {
private val users = AtomicReference(listOf<User>())
fun downloaded(): List<User> = users.get()
suspend fun fetchUser(id: Int) {
val newUser = api.fetchUser(id)
users.getAndUpdate { it + newUser }
}
}
원자성은 하나의 프리미티브 변수 또는 하나의 레퍼런스의 안전을 보장하기 위해 사용되지만, 좀더 복잡한 경우에는 다른 방법을 사용해야 한다.
싱글 스레드로 제한한 디스패쳐
싱글 스레드 디스패쳐를 사용하는 것이 공유 상태와 관련된 대부분의 문제를 해결하는 가장 쉬운 방법이다.
val dispatcher = Dispatchers.IO.limitedParallelism(1)
var counter = 0
fun main() = runBlocking {
massiveRun {
withContext(dispatcher) {
counter++
}
}
println(counter) // 1000000
}
두 가지 방법으로 디스패처를 사용 할 수 있다. 첫번째 방법은 코스 그레인드 스레드 한정 으로 알려져 있다. 이 방법은 디스패쳐를 싱글스레드로 제한 제한한 withContext로 전체 함수를 래핑하는 방법이다. 사용하기 쉬우며 충돌을 방지할 수 있지만, 함수 전체에서 멀티스레딩의 이점을 누리지 못하는 문제가 있다. 다음 예를 보자. api.fetch(id)는 여러 개의 스레드에서 병렬로 시작할 수 있지만 함수 본체는 싱글스레드로 제한된 디스패쳐에서 실행된다. 그 결과, 블로킹되는 함수 또는 CPU 집약적인 함수를 호출하면 함수 실행이 느려지게 된다.
class UserDownloader(
private val api: NetworkService,
) {
private val users = mutableListOf<User>()
private val dispatcher = Dispatchers.IO.limitedParallelism(1)
suspend fun downloaded(): List<User> =
withContext(dispatcher) {
users.toList()
}
suspend fun fetchUser(id: Int) = withContext(dispatcher) {
val newUser = api.fetchUser(id)
users += newUser
}
}
두번째 방법은 파인 그레인드 스레드 한정 으로 알려져 있다. 이 방법은 상태를 변경하는 구문들만 래핑한다. 예제 에서 는 user를 사용하는 모든 줄에 해당합니다. 파인 그래인드 스레드 한정은 좀 더 번거롭지만 (예제에서는 fetchUser와 같은) 크리티컬 섹션이 아닌 부분이 블로킹되거나 CPU 집약적인 경우에 더 나은 성능을 제공한다. 일반적인 중단 함수에 적용하는 경우에는 성능에 큰 차이가 없다.
class UserDownloader(
private val api: NetworkService,
) {
private val users = mutableListOf<User>()
private val dispatcher = Dispatchers.IO.limitedParallelism(1)
suspend fun downloaded(): List<User> = withContext(dispatcher) {
users.toList()
}
suspend fun fetchUser(id: Int) {
val newUser = api.fetchUser(id)
withContext(dispatcher) {
users += newUser
}
}
}
대부분의 경우, 표준 디스패쳐가 같은 스레드 풀을 사용하기 때문에 싱글 스레드를 가진 디스패쳐를 사용하는건 쉬울 뿐아니라 효율적이다.
뮤텍스
마지막으로 가장 인기 있는 방식은 Mutex 를 사용하는 것이다. 뮤텍스를 단하나의 열쇠가 있는 방이라고 생각할 수 있다.
뮤텍스의 가장 중요한 기능은 lock이다. 첫번째 코루틴이 lock을 호출하면 열쇠를 가지고 중단없이 작업을 수행한다. 또 다른 코루틴이 lock을 호출하면 첫번째 코루틴이 unlock 함수를 함수를 호출할 때까지 중단된다. 또 다른 코루틴이 lock 함수를 호출하면, 마찬가지로 작업을 중단한 뒤에 두번째 코루틴 다음 순서로 큐에 들어가게 된다. 첫번째 코루틴이 unlock 함수를 호출하면 열쇠를 반납하고 두번째 코루틴이 재개한 뒤 lock 함수를 통과하게 된다. 따라서 단 하나의 코루틴만이 lock과 unlock 사이에 있을 수 있다.
suspend fun main() = coroutineScope {
repeat (5) {
launch {
delayAndPrint()
}
}
}
val mutex = Mutex()
suspend fun delayAndPrint() {
mutex.lock() delay(1000)
println("Done")
mutex.unlock()
}
// (1초 후)
// Done
// (1초 후)
// Done
// (1초 후)
// Done
// (1초 후)
// Done
// (1초 후)
// Done
lock 과 unlock 을 직접 사용하는 건 위험한데, 두 함수 사이에서 예외가 발생할 경우(또는 반환이 빠르게 이뤄질 경우) 열쇠를 돌려받을 수 없으며(unlock을 호출할 수 없음), 그 결과 다른 코루틴이 lock을 통과 할 수 없게 된다. 데드락이라 알려진 심각한 문제이다.(누군가 너무 급해 열쇠를 돌려주는 걸 잊는 바람에 화장실을 사용할 수 없는 상황을 떠올리면 됨) 대신 lock으로 시작해 finally블록에서 unlock을 호출하는 withLock함수를 사용하여 블록내에서 어떤 예외가 발생하더라도 자물쇠를 성공적으로 풀 수 있게 할 수 있다. 실제 사용하는 법은 synchronized 블록과 비슷하다.
val mutex = Mutex()
var counter = 0
fun main() = runBlocking {
massiveRun {
mutex.withLock {
counter++
}
}
println(counter) // 1000000
}
synchronized 블록과 달리 뮤텍스가 가지는 중요한 이점은 스레드를 블로킹하는 대신 코루틴을 중단한다는 것이다. 좀더 안전하고 가벼운 방식이다. 병렬 실행이 싱글스레드로 제한한 디스패쳐를 사용하는 것과 비교하면 뮤텍스가 가벼우며 좀 더 나은 성능을 가질 수 있다. 하지만 적절히 사용하는 것 또한 더 어렵다. 뮤텍스를 사용할 때 맞닥뜨리는 위험한 경우는 코루틴이 락을 두번 통과할 수 없다는 것이다.
다음 코드를 실행하면 교착상태에 빠지게 되며 영원히 블로킹된 상태로 있게 된다.
suspend fun main() {
val mutex = Mutex()
println("Started")
mutex.withLock {
mutex.withLock {
println("Will never be printed")
}
}
}
// Started
// (영원히 실행됩니다.)
뮤텍스가 가진 두번째 문제점은 코루틴이 중단되었을때 뮤텍스를 풀 수 없다는 점이다. 다음 코르르 보면 delay 중에 뮤텍스가 잠겨 있어 5초 걸리는 것을 확인 할 수 있다.
class MessagesRepository {
private val messages = mutableListOf<String>()
private val mutex = Mutex()
suspend fun add(message: String) = mutex.withLock {
delay(1000) // 네트워크 호출이라 가정합니다.
messages.add(message)
}
}
suspend fun main() {
val repo = MessagesRepository()
val timeMillis = measureTimeMillis {
coroutineScope {
repeat(5) {
launch {
repo.add("Message$it")
}
}
}
}
println(timeMillis) // ~5120
}
상글 스레드로 제한 된 디스패쳐를 사용하면 이런 문제는 발생하지 않는다. delay나 네트워크 호출이 코루틴을 중단 시키면 스레드를 다른 코루틴이 사용한다.
class MessagesRepository {
private val messages = mutableList0f<String>()
private val dispatcher = Dispatchers.IO.limitedParallelism(1)
suspend fun add(message: String) =
withContext(dispatcher) {
delay(1000) // 네트워크 호출이라 가정합니다.
messages.add(message)
}
}
suspend fun main() {
val repo = MessagesRepository()
val timeMillis = measureTimeMillis {
coroutineScope {
repeat(5) {
launch {
repo.add("Message$it")
}
}
}
}
println(timeMillis) // 1058
}
따라서 전체 함수를 뮤텍스로 래핑하는 건 지양해야 한다.(코스 그레인드 방식)
뮤텍스를 사용하기로 했다면 락을 두번 걸지 않고 중단 함수를 호출하지 않도록 신경써야 한다.
class MongoUserRepository( /* ... */) : UserRepository {
private val mutex = Mutex()
override suspend fun updateUser(userId: String, userUpdate: UserUpdate): Unit =
mutex.withLock {
// 데이터 갱신은 여러 함수를 호출하는 과정이 아니며,
// 데이터베이스에서 일어나는 일입니다.
// 이 코드는 단지 예제일 뿐입니다.
val currentUser = getUser(userId) // 데드락!
deleteUser(userId) // 데드락!
addUser(currentUser.updated(userUpdate)) // 데드락!
}
override suspend fun getUser(userId: String): User = mutex.withLock { /* ... */ }
override suspend fun deleteUser(userId: String): Unit = mutex.withLock { /* ... */ }
override suspend fun addUser(user: User): User = mutex.withLock { /* ... */ }
}
(공유 상태를 변경하는 곳에서만 래핑하는) 파인 그레인드 스레드 한정이 도움이 될 수 있지만, 위 예제와 같은 경우 필자는 싱글스레드로 제한된 디스패쳐를 더 선호한다.
세마포어
Mutex 에 대해 배웠다면, 비슷한 방식으로 작동하지만 둘 이상이 접근 할 수 있고, 사용법이 다른 세마포어도 알아야 한다. Mutex는 하나의 접근만 허용하므로, lock, unlock, withLock 함수를 가지고 있다. Semaphore는 여러개의 접근을 허용하므로, acquire, release, withPermit 함수를 가지고 있다.
suspend fun main() = coroutineScope {
val semaphore = Semaphore(2)
repeat(5) {
launch {
semaphore.withPermit {
delay(1000)
print(it)
}
}
}
}
// 01
// (1초 후)
// 23
// (1초 후)
// 4
세마포어는 공유 상태로 인한 생기는 문제를 해결할 수 있는 없지만, 동시요청을 처리하는 수를 제한할 때 사용할 수 있어 처리율 제한 장치를 구현할때 도움이 된다.
class LimitedNetworkUserRepository(
private val api: UserApi,
) {
// 동시 요청을 10개로 제한합니다.
private val semaphore = Semaphore(10)
suspend fun requestUser(userId: String) = semaphore.withPermit {
api.requestUser(userId)
}
}
요약
공유상태를 변경할 때 발생할 수 있는 충돌을 피하기 위해 코루틴을 다루는 방법은 다양하다. 가장 많이 쓰이는 방식은 싱글스레드로 제한 된 디스패쳐를 사용해 공유상태를 변경하는 것이다. 동기화가 필요한 특정장소만 매핑하는 파인 그레인드스레드 한정 이나 전체 함수를 래핑하는 코스 그레인드 스레드 한정을 활용할 수 있다. 두번째 방법이 더 쉽지만 성능이 떨어진다. 원자값이나 뮤텍스를 사용하는 방법도 있다.
'도서 > 코틀린 코루틴' 카테고리의 다른 글
플로우란 무엇인가? (0) | 2024.03.24 |
---|---|
3장 채널과 플로우 - 채널 (0) | 2024.03.08 |
2장 코루틴 라이브러리 - 코루틴 스코프 만들기 (1) | 2024.02.10 |
2장 코루틴 라이브러리 - 디스패쳐 (0) | 2024.02.09 |
2장 코루틴 라이브러리 - 코루틴 스코프 함수 (1) | 2024.02.04 |