본문 바로가기
도서/코틀린 코루틴

공유플로우와 상태플로우

by 안스 인민군 2024. 3. 24.

플로우는 콜드 데이터 스트림이기 때문에 요청할 때마다 값이 계산이 된다.

이 때 여러개의 수신자 가 하나의 데이터가 변경되는지 감지하는 경우가 생길 수 있다.

이때 공유플로우를 사용할 수 있다.

공유플로우

공유플로우를 통해 메세지를 방출하면 대기하고 있던 모든 코루틴이 수신하게 된다.

suspend fun main() {
    coroutineScope {
        val flow = MutableSharedFlow<String>()

        launch {
            flow.collect(::println)
        }
        delay(1000)
        flow.emit("message 1")
        flow.emit("message 2")
    }
}
//
1초후
message 1
message 2
프로그램은 게속 실행됨

해당 flow 를 중지시킬려면 부모 코루틴인 coroutineScope 를 취소해야 한다.

 

MutableSharedFlow는 데이터를 방출하는 작업을 유지할 수 있다.

기본값이 replay 0 의 인자를 수정하면 마지막으로 정해진값들을 먼저 받게 된다.

public interface SharedFlow<out T> : Flow<T> {
    public val replayCache: List<T>

    override suspend fun collect(collector: FlowCollector<T>): Nothing
}

suspend fun main() {
    coroutineScope {
        val flow = MutableSharedFlow<String>(replay = 1)

        flow.emit("message 1")
        flow.emit("message 2")
        println(flow.replayCache)

        launch { flow.collect(::println) }
        delay(100)
        flow.resetReplayCache()
        println(flow.replayCache)
    }
}
//
[message 2]
message 2
[]
프로그램은 게속 실행됨

위의 예제 코드를 보면 replay 를 1 로 설정을해, 마지막으로 저장된 message 2 가 출력되는 것을 볼 수 있고,

resetReplayCache 를 해주어 마지막 print 문에서는 null 이 출력되는 것을 확인할 수 있다.

sharedIn

플로우를 sharedFlow 로 변환할려면 sharedIn함수를 사용해야 한다.

suspend fun main() {
    coroutineScope {
        val flow = flowOf(1, 2, 3).onEach { delay(1000) }
        val sharedFlow = flow.shareIn(
            scope = this,
            started = SharingStarted.Eagerly,
            replay = 0
        )

        delay(500)

        launch {
            sharedFlow.collect(::println)
        }

        delay(1000)

        launch {
            sharedFlow.collect(::println)
        }

        delay(1000)

        launch {
            sharedFlow.collect(::println)
        }
    }
}
//
1
2
2
3
3
3

sharedIn함수중 started 인자에 대해 더 알아보자.

  • SharingStarted.Eagerly
    • 즉시 값을 감지하고, 플로우로 값을 전송한다. 이 때 replay 값에 제한이 있고, 감지를 시작하기 전에 값이 나오면 값을 유실할 수 있다.
    • replay 가 0 이라면 먼저 들어온 값을 전부 유실된다.
suspend fun main() {
    coroutineScope {
        val flow = flowOf(1, 2, 3)
        val sharedFlow = flow.shareIn(
            scope = this,
            started = SharingStarted.Eagerly,
            replay = 0 // replay 수만큼 출력된다. 
        )
        
        delay(100)
        launch { sharedFlow.collect(::println) }
        println("done")
    }
}
//
done

SharingStarted.Lazily

  • 이름부터 알 수 있듯이, 첫 번째 구독자가 나올때 감지하기 시작한다. 즉 첫번째 구독자는 값을 보장받을 수 있고,dl이후의 구독자는 replay 수만큼 최근에 저장된 값을 받게된다.
suspend fun main() {
    coroutineScope {
        val flow = flowOf(1, 2, 3)
        val flow1 = flowOf(4).onEach { delay(1000) }
        val sharedFlow = merge(flow,flow1).shareIn(
            scope = this,
            started = SharingStarted.Lazily
        )
        delay(100)
        launch { sharedFlow.collect(::println) }
        delay(1000)
        launch { sharedFlow.collect(::println) }
    }
}
//
1
2
3
4
4

SharingStarted.WhileSubscribed()

  • 첫 번째 구독자가 나올 때 감지하기 시작되며, 마지막 구독자가 사라지면 플로우도 멈춘다.
suspend fun main() {
    coroutineScope {
        val flow = flowOf(1, 2, 3)
            .onStart { println("Started") }
            .onCompletion { println("Finish") }
            .onEach { delay(1000) }

        val sharedFlow = flow.shareIn(
            scope = this,
            started = SharingStarted.WhileSubscribed()
        )

        delay(3000)
        launch { println(sharedFlow.first()) }
        launch { println(sharedFlow.take(2).toList()) }

        delay(3000)
        launch { println(sharedFlow.first()) }
    }
}
//
3초 후
Started
// 1초 후
1
[1, 2]
Finish
// 1초 후
Started
1
// 3초 후
Finish

즉 Room 에서 Flow 로 데이터를 감지할 때 sharedIn 으로 매핑하고, WhileSubscribed 를 인자로 추가하면

구독후의 변경에 감지할 수 있다.

상태플로우

상태플로우는 공유플로우의 개념을 확장시킨 것으로, SharedFlow 의 replay 인자값이 1인 것과 비슷하게 작동한다.

또 차이점은 value 프로퍼티로 접근이 가능하다.

public interface StateFlow<out T> : SharedFlow<T> {
    val value: T
}

public interface MutableStateFlow<T> : StateFlow<T>, MutableSharedFlow<T> {
 
    public fun compareAndSet(expect: T, update: T): Boolean
}

Android 에서는 LiveData 를 대체하는 최신 방식으로 사용하고 있다.

그 이유는

  • Coroutine 을 완벽하게 지원한다.
  • 초기값을 지원하기 때문에, null 이 아니다.

 

상태 플로우는 데이터가 덮어 씌워지기 때문에, 값이 느리게 받아온다면, 중간 값을 못받아 올 수 있다.

모든 값을 다 받아올려면 공유플로우를 사용해야한다.

suspend fun main() {
    coroutineScope {
        val stateFlow = MutableStateFlow(1)

        launch {
            (1..10).forEach{
                delay(300)
                stateFlow.value = it
            }
        }
        stateFlow.collect{
            delay(1000)
            println(it)
        }

    }
}
//
1
3
6
9
10

이는 의도된 것이며, 상태 플로우는 현재 상태만 나타내기 때문에 이전값을 나타낼 수 없다.

sharedIn

sharedIn 함수는 Flow 를 StageFlow 로 변환하는 함수이다.

스코프에서만 호출이 가능하고, 중단 함수이다. 따라서 값이 생성될 때까지 기다려야한다.

suspend fun main() {
    coroutineScope {
     val flow = flowOf(1,2,3)
         .onEach { delay(1000) }
         .onEach { println(it) }

        val stateFlow = flow.stateIn(this)
        println("start")
        println(stateFlow.value)
        stateFlow.collect(::println)
    }
}
//
// 1초 후
1
start
1
1
// 1초 후
2
2
// 1초 후
3
3

statein 인자에 started 을 할당할 수 있는데, sharedIn 에서 사용하던 거랑 동일한 기능을 가진다.

'도서 > 코틀린 코루틴' 카테고리의 다른 글

플로우 생명주기 함수  (0) 2024.03.24
플로우 만들기  (0) 2024.03.24
플로우의 실제 구현  (0) 2024.03.24
핫 데이터 소스와 콜드 데이터 소스  (0) 2024.03.24
플로우란 무엇인가?  (0) 2024.03.24