Coroutines guide, kotlinx-coroutines-core / flow

 

https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/

 

kotlinx-coroutines-core

Core primitives to work with coroutines. Coroutine builder functions: Coroutine dispatchers implementing CoroutineDispatcher: More context elements: Synchronization primitives for coroutines: Top-level suspending functions: NameDescriptiondelayNon-blocking

kotlin.github.io

 

  1. SharedFlow 
  2. shareIn
  3. onSubscription
  4. conflate
  5. stateFlow
  6. toStates
  7. scan
  8. reduce
  9. merge
  10. snapshotFlow
  11. onStart
  12. onEach

 

1. SharedFlow

 

A hotFlow that shares emitted values among all its collectors in a broadcast fashion, so that all collectors get all emitted values. A shared flow is called hot because its active instance exists independently of the presence of collectors. This is opposed to a regular Flow, such as defined by the flow { ... } function, which is cold and is started separately for each collector.

SharedFlow는 broadcast 되고 있는 collector들의 모든 emitted values를 share 하는 "hot" Flow 이다.

모든 collector들은 모든 emitted values를 collecting한다.

기존의 Flow { } operator로 만들던 flow는 "cold"이며 이는 각각의 collector들이 started 되었을때 독립적으로 동작함을 의미한다.

반면에, shared flow는 hot이며 collector들이 존재함과는 독립적으로 active instance가 존재함을 의미한다(즉, collector의 start 여부와 상관없이 flow는 operating한다)

 

Shared flow never completes. A call to Flow.collect on a shared flow never completes normally, and neither does a coroutine started by the Flow.launchIn function. An active collector of a shared flow is called a subscriber.

Shared flow는 completes가 되지 않는다. (Rx의 subject와 유사한 개념인듯)

launchIn에 의해 코루틴이 시작되지 않고, complete되지도 않는 특징을 가진다.

active collector of shared flow는 subscriber라고 말한다(이것도 rx- subject와 비슷)

 

A subscriber of a shared flow can be cancelled. This usually happens when the scope in which the coroutine is running is cancelled. A subscriber to a shared flow is always cancellable, and checks for cancellation before each emission. Note that most terminal operators like Flow.toList would also not complete, when applied to a shared flow, but flow-truncating operators like Flow.take and Flow.takeWhile can be used on a shared flow to turn it into a completing one.

subscriber는 cancelled 될 수 있다. 보통은, running을 하고 있던 coroutine이 cancelled 되면, subscriber는 cancelled된다.

shared flow에 applied된 Flow.toList는 complete 되지 않지만 take나 takeWhile로 flow-truncating operator로 처리하게 되면 completing one으로 바뀐다.

 

A mutable shared flow is created using the MutableSharedFlow(...) constructor function. Its state can be updated by emitting values to it and performing other operations. See the MutableSharedFlow documentation for details.

SharedFlow is useful for broadcasting events that happen inside an application to subscribers that can come and go.

mutable shared flow는 MutableSharedFlow 생성자를 통해 생성한다. its state는 emitting value를 하면서 updated 될 수 있다. SharedFlow는 event를 broadcating 하여 내부 subscriber에 전달하는데 유용하다.

 

For example, the following class encapsulates an event bus that distributes events to all subscribers in a rendezvous manner, suspending until all subscribers receive emitted event:

class EventBus {
    private val _events = MutableSharedFlow<Event>() // private mutable shared flow
    val events = _events.asSharedFlow() // publicly exposed as read-only shared flow

    suspend fun produceEvent(event: Event) {
        _events.emit(event) // suspends until all subscribers receive it
    }
}

As an alternative to the above usage with the MutableSharedFlow(...) constructor function, any coldFlow can be converted to a shared flow using the shareIn operator.

MutableSharedFlow 대신 coldFlow를 sharedFlow로 바꾸려면 shareIn operator를 사용할 수 있다

 

There is a specialized implementation of shared flow for the case where the most recent state value needs to be shared. See StateFlow for details.

 

예시코드 - 2022.10.28 추가 )

HotFlow와 ColdFlow를 비교하면 다음과 같다.

 

아래의 코드를 실행시키면, hotFlow(val sFlow) 과 cold(val nFlow)가 각각 launch { }에 의해 parallelly executing한다. 하지만, hot의 경우 먼저 class instance되어 emitting 되고 있으며 cold는 collect 호출이후 activate되어 emitting이 시작된다. 결과는 아래 printed와 같이 됨.

class FlowHandler(
    externalScope: CoroutineScope,
    private val intervalMs: Long = 1000
) {
    private val _sFlow = MutableSharedFlow<String>(replay = 0)
    val sFlow: SharedFlow<String> = _sFlow

    init {
        var checker = 0
        externalScope.launch {
            while(true) {
                _sFlow.emit("$checker - ${(1000..2000).random()}")
                checker++
                delay(intervalMs)
            }
        }
    }
}

fun main(): Unit = runBlocking {

    val coroutineScope = CoroutineScope(currentCoroutineContext())
    val flowHandler = FlowHandler(coroutineScope)
    var checker = 0
    
    val nFlow : Flow<String> = flow {
        repeat(100){
            checker++
            delay(5000)
            emit("$checker - ${(1000..2000).random()}")
        }
    }
    delay(9000)

    launch {
        nFlow.collect{aString ->
            println("nFlow : $aString" )
        }
    }
    launch {
        flowHandler.sFlow.collect{ aString ->
            println("sFlow : $aString" )
        }
    }
}

9000ms를 딜레이 하고 시작하였기에 sFlow의 emitting 1~8까지는 collecting되지 않았음을 위 print를 통해 알 수 있다.

 

SharedFlow/Flow 비교

 

참고 : anroid-flow 에 의한 응용적인 측면에서 접근 link : https://developer.android.com/kotlin/flow/stateflow-and-sharedflow?hl=ko

 

Replay cache and buffer

A shared flow keeps a specific number of the most recent values in its replay cache. Every new subscriber first gets the values from the replay cache and then gets new emitted values. The maximum size of the replay cache is specified when the shared flow is created by the replay parameter. A snapshot of the current replay cache is available via the replayCache property and it can be reset with the MutableSharedFlow.resetReplayCache function.

 

A replay cache also provides buffer for emissions to the shared flow, allowing slow subscribers to get values from the buffer without suspending emitters. The buffer space determines how much slow subscribers can lag from the fast ones. When creating a shared flow, additional buffer capacity beyond replay can be reserved using the extraBufferCapacity parameter.

 

A shared flow with a buffer can be configured to avoid suspension of emitters on buffer overflow using the onBufferOverflow parameter, which is equal to one of the entries of the BufferOverflow enum. When a strategy other than SUSPENDED is configured, emissions to the shared flow never suspend.

 

Buffer overflow condition can happen only when there is at least one subscriber that is not ready to accept the new value. In the absence of subscribers only the most recent replay values are stored and the buffer overflow behavior is never triggered and has no effect. In particular, in the absence of subscribers emitter never suspends despite BufferOverflow.SUSPEND option and BufferOverflow.DROP_LATEST option does not have effect either. Essentially, the behavior in the absence of subscribers is always similar to BufferOverflow.DROP_OLDEST, but the buffer is just of replay size (without any extraBufferCapacity).

 

요약 : replay는 hot flow상 이미 지나간 element를 다시 재생하는 방법에 관련한 메소드를 제공하는 것이며. 이에 대한 세부 파라미터 조정은 필요하다면 이 페이지를 다시 참고하여 구현하면 된다.

 

Unbuffered shared flow

A default implementation of a shared flow that is created with MutableSharedFlow() constructor function without parameters has no replay cache nor additional buffer. emit call to such a shared flow suspends until all subscribers receive the emitted value and returns immediately if there are no subscribers. Thus, tryEmit call succeeds and returns true only if there are no subscribers (in which case the emitted value is immediately lost).

 

파라미터 없이 MutableSharedFlow()로 생성한 flow는 additional buffer나 cache가 없다. shared flow는 subscriber가 수신할때까지 emit call이 suspend되며 no subscriber인 경우 immediately return된다. 따라서 tryEmit은 호출은 성공하고 subscriber가 없을 경우 true로 return 된다.

 

SharedFlow vs BroadcastChannel

Conceptually shared flow is similar to BroadcastChannel and is designed to completely replace it. It has the following important differences:

  • SharedFlow is simpler, because it does not have to implement all the Channel APIs, which allows for faster and simpler implementation.
  • SharedFlow supports configurable replay and buffer overflow strategy.
  • SharedFlow has a clear separation into a read-only SharedFlow interface and a MutableSharedFlow.
  • SharedFlow cannot be closed like BroadcastChannel and can never represent a failure. All errors and completion signals should be explicitly materialized if needed.

To migrate BroadcastChannel usage to SharedFlow, start by replacing usages of the BroadcastChannel(capacity) constructor with MutableSharedFlow(0, extraBufferCapacity=capacity) (broadcast channel does not replay values to new subscribers). Replace send and trySend calls with emit and tryEmit, and convert subscribers' code to flow operators.

브로드 캐스트 채널과 유사한 SharedFlow는 위의 장점과 전략을 지닌다.

 

Concurrency

All methods of shared flow are thread-safe and can be safely invoked from concurrent coroutines without external synchronization.

 

shared flow의 모든 메소드는 thread-safe 하다. 외부 동기화 없이 동시 코루틴에서 안전 호출을 할 수 있다

 

Operator fusion

Application of flowOn, buffer with RENDEZVOUS capacity, or cancellable operators to a shared flow has no effect.

Implementation notes

Shared flow implementation uses a lock to ensure thread-safety, but suspending collector and emitter coroutines are resumed outside of this lock to avoid dead-locks when using unconfined coroutines. Adding new subscribers has O(1) amortized cost, but emitting has O(N) cost, where N is the number of subscribers.

 

subscribe 수만큼 시간 비용이 발생하고, 추가할때는 O(1)의 비용.  데드락과 같은 고려는 내부에서 알아서 처리해준다는 의미인듯

 

Not stable for inheritance

The SharedFlow interface is not stable for inheritance in 3rd party libraries, as new methods might be added to this interface in the future, but is stable for use. Use the MutableSharedFlow(replay, ...) constructor function to create an implementation.

 

 

 

 

 

2. shareIn  

 

 

shareIn

shareIn common Converts a coldFlow into a hotSharedFlow that is started in the given coroutine scope, sharing emissions from a single running instance of the upstream flow with multiple downstream subscribers, and replaying a specified number of replay val

kotlin.github.io

Converts a coldFlow into a hotSharedFlow that is started in the given coroutine scope, sharing emissions from a single running instance of the upstream flow with multiple downstream subscribers, and replaying a specified number of replay values to new subscribers. See the SharedFlow documentation for the general concepts of shared flows.

coldFlow를 hotSharedFlow로 변환한다.

 

The starting of the sharing coroutine is controlled by the started parameter. The following options are supported.

  • Eagerly — the upstream flow is started even before the first subscriber appears. Note that in this case all values emitted by the upstream beyond the most recent values as specified by replay parameter will be immediately discarded.
    subscriber가 존재하지 않더라도 upstream shared flow는 바로 시작되며 중간에 중지되지 않는다. 업스트림에서 emitted 된 values은 삭제된다.
  • Lazily — starts the upstream flow after the first subscriber appears, which guarantees that this first subscriber gets all the emitted values, while subsequent subscribers are only guaranteed to get the most recent replay values. The upstream flow continues to be active even when all subscribers disappear, but only the most recent replay values are cached without subscribers.
    최초의 subscriber 가 등장하면 그떄부터 upstream flow는 시작한다. 첫번째 subscriber는 all of emitted values를 get하며, 후속된 subscriber는 most recent replay values만큼의 값을 only guaranteed하여 get하게 된다. 모든 subscriber가 없어지더라도 한번 시작한 upstream flow는 continuously activated하며, caching 되는 value size또한 지정해둔 개수만큼 유지되며 동작한다.
  • WhileSubscribed() — starts the upstream flow when the first subscriber appears, immediately stops when the last subscriber disappears, keeping the replay cache forever. It has additional optional configuration parameters as explained in its documentation.
    first subscriber가 등장했을때 upstream flow가 시작된다. last subscriber가 disappear되면 flow는 즉시 중단된다. 중단이 되면서 replay cache는 foreverly 지속된다(detail fitting을 위한 optional argument를 지원하며 이에 대한 설명은 아래에 있음)

  • A custom strategy can be supplied by implementing the SharingStarted interface.
    SharingStarted interface를 사용하면 customizing이 가능하다.

The shareIn operator is useful in situations when there is a cold flow that is expensive to create and/or to maintain, but there are multiple subscribers that need to collect its values. For example, consider a flow of messages coming from a backend over the expensive network connection, taking a lot of time to establish. Conceptually, it might be implemented like this:

cold flow는 create/maintain에 있어 유지 비용이 큰 편이며 shareIn operator는 이를 구현하는데 유용한 기능을 제공한다.  아래 예제를 보면,

val backendMessages: Flow<Message> = flow {
    connectToBackend() // takes a lot of time
    try {
      while (true) {
          emit(receiveMessageFromBackend())
      }
    } finally {
        disconnectFromBackend()
    }
}

If this flow is directly used in the application, then every time it is collected a fresh connection is established, and it will take a while before messages start flowing. However, we can share a single connection and establish it eagerly like this:

val messages: SharedFlow<Message> = backendMessages.shareIn(scope, SharingStarted.Eagerly)

Now a single connection is shared between all collectors from messages, and there is a chance that the connection is already established by the time it is needed.

위와 같이 shareIn 키워드로 Hot flow로 변경하여 messages를 처리할 수 있다

 

예시코드 - 2022.10.28 추가 )

아래는 sharedIn을 추가하여 hotFlow로 바꿔서 출력해본 예제이다

fun main(): Unit = runBlocking {

    val coroutineScope = CoroutineScope(currentCoroutineContext())
    var checker = 0

    val nFlow : Flow<String> = flow {
        repeat(100){
            checker++
            delay(1000)
            emit("$checker - ${(1000..2000).random()}")
        }
    }.shareIn(coroutineScope, SharingStarted.Lazily)

    delay(9000)

    launch {
        nFlow.collect{aString ->
            println("nFlow : $aString" )
        }
    }
}

위와 같이 Sharing option을 lazily 줄 경우 buffer에 의해 저장된 부분부터 collecting 할 수 있다.

9000ms 대기 한후 lazily collecting 한 경우

 

 

Sharing option을 Eagerly로 준 경우,  아래와 같이 collecting 하는 시점 이전의 elements는 dropped 된다.

 

shareIn(coroutineScope, SharingStarted.Eagerly, replay = 3) 와 같은 replay 옵션을 주면 skip되었던 부분중 최근 3개까지의 elements가 buffered되어 아래와 같이 collecting 된다.

 

설명추가 (2022.10.31) - SharingStarted.WhileSubscribed 옵션 (참고 : https://tourspace.tistory.com/434
 coroutineScope과 SharingStarted param은 stateIn과 sharedIn에서 동일하게 사용된다.


1> stopTimeoutMillis: 구독자가 모두 사라진 이후에 flow가 정지되기까지의 delay를 설정할 수 있다. 0이면 last subscriber가 disappear 되면, 그 즉시 flow가 suspend된다.

2> replayExpriationMilis
: replay를 위해 cache 한 값을 유지할 시간을 지정합니다. 구독자가 모두 사라지는 순간 이후로 정해진 시간만큼 대기하다가 cache를 reset 시킵니다. 기본값은 Long의 Max이므로 영원이 cache 되나, 1000ms 또는 5000ms 등 특정 시간을 지정할 경우 해당 시간 이후로 구독자가 생기지 않으면 repaly를 위해 저장했던 값을 모두 초기화시킵니다. stateIn의 경우 초기값이 설정되며, sharedIn의 경우 emtpy 상태가 됩니다. [8] 만약 구독자가 모두 사라지고 cache 삭제를 대기 중에 구독자가 다시 들어온다면 cache가 그대로 유지됩니다(더 많은 예시 코드는 참고 페이지에서 확인가능)

 

 

Upstream completion and error handling

Normal completion of the upstream flow has no effect on subscribers, and the sharing coroutine continues to run. If a strategy like SharingStarted.WhileSubscribed is used, then the upstream can get restarted again. If a special action on upstream completion is needed, then an onCompletion operator can be used before the shareIn operator to emit a special value in this case, like this:

upstream flow의 normal completion은 subscriber에 effect를 미치지 않는다. 또한 공유 코루틴은 continuously run한다.

onCompletion operator를 사용하여 shareIn operator가 emit 하기 전에 special value에 대한 특별한 동작을 유도할 수 있다. 아래 예제와 같이 사용할 수 있다

backendMessages
    .onCompletion { cause -> if (cause == null) emit(UpstreamHasCompletedMessage) }
    .shareIn(scope, SharingStarted.Eagerly)

Any exception in the upstream flow terminates the sharing coroutine without affecting any of the subscribers, and will be handled by the scope in which the sharing coroutine is launched. Custom exception handling can be configured by using the catch or retry operators before the shareIn operator. For example, to retry connection on any IOException with 1 second delay between attempts, use:

예외 발생 시, Subscriber에 영향을 주지않고 upstream에서 아래와 같이 처리할 수 있다. 이는 sharing coroutine이 launched 되는 지점에서 error에 대해 handling하는 것이다.Custom exception handling은 catch와 retry를 사용하여 configuring할 수 있다

val messages = backendMessages
    .retry { e ->
        val shallRetry = e is IOException // other exception are bugs - handle them
        if (shallRetry) delay(1000)
        shallRetry
    }
    .shareIn(scope, SharingStarted.Eagerly)

Initial value

When a special initial value is needed to signal to subscribers that the upstream is still loading the data, use the onStart operator on the upstream flow. For example:

backendMessages
    .onStart { emit(UpstreamIsStartingMessage) }
    .shareIn(scope, SharingStarted.Eagerly, 1) // replay one most recent message

Buffering and conflation

The shareIn operator runs the upstream flow in a separate coroutine, and buffers emissions from upstream as explained in the buffer operator's description, using a buffer of replay size or the default (whichever is larger). This default buffering can be overridden with an explicit buffer configuration by preceding the shareIn call with buffer or conflate, for example:

  • buffer(0).shareIn(scope, started, 0) — overrides the default buffer size and creates a SharedFlow without a buffer. Effectively, it configures sequential processing between the upstream emitter and subscribers, as the emitter is suspended until all subscribers process the value. Note, that the value is still immediately discarded when there are no subscribers.
    버퍼가 존재 하지않으며 subscriber가 process the value를 할 때까지 the emitter는 suspended된다. 이는 sequential한 processing 방법이다.
  • buffer(b).shareIn(scope, started, r) — creates a SharedFlow with replay = r and extraBufferCapacity = b.
  • conflate().shareIn(scope, started, r) — creates a SharedFlow with replay = r, onBufferOverflow = DROP_OLDEST, and extraBufferCapacity = 1 when replay == 0 to support this strategy.

explicitly 버퍼 사이즈에 대한 옵션을 shareIn operator와 함께 사용할 수 있다.

buffer 혹은 conflate 옵션에 대한 동작은 위와 같다.

 

 

 

3. onSubscription

 

fun <T> SharedFlow<T>.onSubscription(action: suspend FlowCollector<T>.() -> Unit): SharedFlow<T>

Returns a flow that invokes the given action after this shared flow starts to be collected (after the subscription is registered).

The action is called before any value is emitted from the upstream flow to this subscription but after the subscription is established. It is guaranteed that all emissions to the upstream flow that happen inside or immediately after this onSubscription action will be collected by this subscription.

The receiver of the action is FlowCollector, so onSubscription can emit additional elements.

 

 

4. conflate(https://witcheryoon.tistory.com/291,  conflation 부분 참고)

 

fun <T> Flow<T>.conflate(): Flow<T>

Conflates flow emissions via conflated channel and runs collector in a separate coroutine. The effect of this is that emitter is never suspended due to a slow collector, but collector always gets the most recent value emitted.

독립적인 코루틴을 통해 collector가 동작하여 각 emission을 조건에 맞게 받아들이는 역할을 한다. emitter는 never suspended이며, collector는 항상 recent value emitted를 get한다.

 

For example, consider the flow that emits integers from 1 to 30 with 100 ms delay between them:

val flow = flow {
    for (i in 1..30) {
        delay(100)
        emit(i)
    }
}

Applying conflate() operator to it allows a collector that delays 1 second on each element to get integers 1, 10, 20, 30:

val result = flow.conflate().onEach { delay(1000) }.toList()
assertEquals(listOf(1, 10, 20, 30), result)

Note that conflate operator is a shortcut for buffer with capacity of Channel.CONFLATED, with is, in turn, a shortcut to a buffer that only keeps the latest element as created by buffer(onBufferOverflow = BufferOverflow.DROP_OLDEST).

Operator fusion

Adjacent applications of conflate/buffer, channelFlow, flowOn and produceIn are always fused so that only one properly configured channel is used for execution. Conflation takes precedence over buffer() calls with any other capacity.

Note that any instance of StateFlow already behaves as if conflate operator is applied to it, so applying conflate to a StateFlow has no effect. See StateFlow documentation on Operator Fusion.

 

 

5. stateFlow

- recently explain updated at 2022.10.30

interface StateFlow<out T> : SharedFlow<T>

A SharedFlow that represents a read-only state with a single updatable data value that emits updates to the value to its collectors. A state flow is a hot flow because its active instance exists independently of the presence of collectors. Its current value can be retrieved via the value property.

 

StateFlow는 값에 대한 update를 해당 collectors로 내보내는 updatable single data value로써 read-only 상태를 나타내는 SharedFlow이다. 따라서 stateFlow는 hot flow이며, collector의 존재와 별개로 active instance에 의해 동작한다

 

State flow never completes. A call to Flow.collect on a state flow never completes normally, and neither does a coroutine started by the Flow.launchIn function. An active collector of a state flow is called a subscriber.

State flow 는 complete되지 않는다. collect에 의해 normally complete되지 않고 또한, launchIn 메소드에 의해 시작되지도 않는다. state flow의 active collector는 subscriber라 부른다

 

A mutable state flow is created using MutableStateFlow(value) constructor function with the initial value. The value of mutable state flow can be updated by setting its value property. Updates to the value are always conflated. So a slow collector skips fast updates, but always collects the most recently emitted value.

mutable state flow 의 update는 항상 conflated 하므로 slow collector는 fast update를 skip하나 항상 collect 시점에서는 most recently emitted value를 수집한다.

 

StateFlow is useful as a data-model class to represent any kind of state. Derived values can be defined using various operators on the flows, with combine operator being especially useful to combine values from multiple state flows using arbitrary functions.

stateFlow는 state를 관리하는 data model class로서 유용하다.

 

For example, the following class encapsulates an integer state and increments its value on each call to inc:

class CounterModel {
    private val _counter = MutableStateFlow(0) // private mutable state flow
    val counter = _counter.asStateFlow() // publicly exposed as read-only state flow

    fun inc() {
        _counter.update { count -> count + 1 } // atomic, safe for concurrent use
    }
}

Having two instances of the above CounterModel class one can define the sum of their counters like this:

val aModel = CounterModel()
val bModel = CounterModel()
val sumFlow: Flow<Int> = aModel.counter.combine(bModel.counter) { a, b -> a + b }

 

예시코드 - 2022.10.30 추가 ) sumFlow는 개별 stateFlow의 state변화를 감지하는 flow가 된다.

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

class CounterModel {
    private val _counter = MutableStateFlow(0) // private mutable state flow
    val counter: StateFlow<Int> = _counter.asStateFlow() // publicly exposed as read-only state flow

    fun inc() {
        _counter.update { count -> count + 1 } // atomic, safe for concurrent use
    }
}


fun main() = runBlocking<Unit> {

    val aModel = CounterModel()
    val bModel = CounterModel()
    val sumFlow: Flow<Int> = aModel.counter.combine(bModel.counter) { a, b -> a + b }

    var job : Job? = null
    launch {
        delay(1000L)
        aModel.inc()
        println("aModel updated")

        bModel.inc()
        println("bModel updated")

        delay(2000L)
        job?.cancel()
        println("job Terminated")
    }

    job = launch {
        sumFlow.collect{
            println("sumFlow is printed : $it")
        }
    }

}

simple data flow for controlling stateFlows

 

printed result of above code

 

 

As an alternative to the above usage with the MutableStateFlow(...) constructor function, any coldFlow can be converted to a state flow using the stateIn operator.

stateIn을 사용하여 위 과정과 동일한 효과를 coldFlow로 부터 이끌어 낼 수 있다.

 

Strong equality-based conflation

Values in state flow are conflated using Any.equals comparison in a similar way to distinctUntilChanged operator. It is used to conflate incoming updates to value in MutableStateFlow and to suppress emission of the values to collectors when new value is equal to the previously emitted one. State flow behavior with classes that violate the contract for Any.equals is unspecified.

합성시 강력한 equality를 base로함

State flow is a shared flow

- recently explain updated at 2022.10.30

 

State flow is a special-purpose, high-performance, and efficient implementation of SharedFlow for the narrow, but widely used case of sharing a state. See the SharedFlow documentation for the basic rules, constraints, and operators that are applicable to all shared flows.

State flow는 sharing a state를 하기위해 widely used되는 SharedFlow의 special-purpose버전이다.

 

State flow always has an initial value, replays one most recent value to new subscribers, does not buffer any more values, but keeps the last emitted one, and does not support resetReplayCache. A state flow behaves identically to a shared flow when it is created with the following parameters and the distinctUntilChanged operator is applied to it:

state flow는 언제나 initial value를 가지며, 새로운 subscribers에게 one most recent value를 replay한다. buffer에 대한 개념은 없으며(목적상 필요도 없으며) last emitted one을 should keep it for replaying. 

 

// MutableStateFlow(initialValue) is a shared flow with the following parameters:
val shared = MutableSharedFlow(
    replay = 1,
    onBufferOverflow = BufferOverflow.DROP_OLDEST
)
shared.tryEmit(initialValue) // emit the initial value
val state = shared.distinctUntilChanged() // get StateFlow-like behavior

Use SharedFlow when you need a StateFlow with tweaks in its behavior such as extra buffering, replaying more values, or omitting the initial value. 

buffer 개수 조정등의 복잡한 케이스 구현을 하고자한다면 SharedFlow를 사용하라.

 

StateFlow vs ConflatedBroadcastChannel

Conceptually, state flow is similar to ConflatedBroadcastChannel and is designed to completely replace it. It has the following important differences:

  • StateFlow is simpler, because it does not have to implement all the Channel APIs, which allows for faster, garbage-free implementation, unlike ConflatedBroadcastChannel implementation that allocates objects on each emitted value.
  • StateFlow always has a value which can be safely read at any time via value property. Unlike ConflatedBroadcastChannel, there is no way to create a state flow without a value.
  • StateFlow has a clear separation into a read-only StateFlow interface and a MutableStateFlow.
  • StateFlow conflation is based on equality like distinctUntilChanged operator, unlike conflation in ConflatedBroadcastChannel that is based on reference identity.
  • StateFlow cannot be closed like ConflatedBroadcastChannel and can never represent a failure. All errors and completion signals should be explicitly materialized if needed.

StateFlow is designed to better cover typical use-cases of keeping track of state changes in time, taking more pragmatic design choices for the sake of convenience.

To migrate ConflatedBroadcastChannel usage to StateFlow, start by replacing usages of the ConflatedBroadcastChannel() constructor with MutableStateFlow(initialValue), using null as an initial value if you don't have one. Replace send and trySend calls with updates to the state flow's MutableStateFlow.value, and convert subscribers' code to flow operators. You can use the filterNotNull operator to mimic behavior of a ConflatedBroadcastChannel without initial value.

 

(대충 StateFlow가 더 pragmatic한 view에서 designed 되었으니까 ConflatedBroadcastChannel대신 migrate하여 StateFlow를 쓰는게 낫지않겠냐는 글)

 

6. stateIn

fun <T> Flow<T>.stateIn(scope: CoroutineScope, started: SharingStarted, initialValue: T): StateFlow<T>

Converts a coldFlow into a hotStateFlow that is started in the given coroutine scope, sharing the most recently emitted value from a single running instance of the upstream flow with multiple downstream subscribers. See the StateFlow documentation for the general concepts of state flows.

coldFlow를 hot - stateFlow로 변경하여 지정된 코루틴에서 실행하도록 변경한다.

단일 실행 인스턴스에서 가장 최근에 내보낸 값을 여러 downstream subscriber와 공유함

 

The starting of the sharing coroutine is controlled by the started parameter, as explained in the documentation for shareIn operator.

The stateIn operator is useful in situations when there is a cold flow that provides updates to the value of some state and is expensive to create and/or to maintain, but there are multiple subscribers that need to collect the most recent state value. For example, consider a flow of state updates coming from a backend over the expensive network connection, taking a lot of time to establish. Conceptually it might be implemented like this:

가장 최근 상태 값을 collect해야 하는 subscriber가 다수인 cold flow가 있는 상황에서 유용하다

val backendState: Flow<State> = flow {
    connectToBackend() // takes a lot of time
    try {
      while (true) {
          emit(receiveStateUpdateFromBackend())
      }
    } finally {
        disconnectFromBackend()
    }
}

If this flow is directly used in the application, then every time it is collected a fresh connection is established, and it will take a while before state updates start flowing. However, we can share a single connection and establish it eagerly like this:

val state: StateFlow<State> = backendMessages.stateIn(scope, SharingStarted.Eagerly, State.LOADING)

Now, a single connection is shared between all collectors from state, and there is a chance that the connection is already established by the time it is needed.

Upstream completion and error handling

Normal completion of the upstream flow has no effect on subscribers, and the sharing coroutine continues to run. If a a strategy like SharingStarted.WhileSubscribed is used, then the upstream can get restarted again. If a special action on upstream completion is needed, then an onCompletion operator can be used before the stateIn operator to emit a special value in this case. See the shareIn operator's documentation for an example.

Any exception in the upstream flow terminates the sharing coroutine without affecting any of the subscribers, and will be handled by the scope in which the sharing coroutine is launched. Custom exception handling can be configured by using the catch or retry operators before the stateIn operator, similarly to the shareIn operator.

 

9.Merge

common
fun <T> Iterable<Flow<T>>.merge(): Flow<T>
fun <T> merge(vararg flows: Flow<T>): Flow<T>

Merges the given flows into a single flow without preserving an order of elements. All flows are merged concurrently, without limit on the number of simultaneously collected flows.

elements의 순서없이 주어진 flow를 single flow로 merge하는 메소드

 

 

10. snapshotFlow

 

 참조  https://witcheryoon.tistory.com/342

11.onStart

kotlinx-coroutines-core/kotlinx.coroutines.flow/onStart

Returns a flow that invokes the given action before this flow starts to be collected.

The action is called before the upstream flow is started, so if it is used with a SharedFlow there is no guarantee that emissions from the upstream flow that happen inside or immediately after this onStart action will be collected (see onSubscription for an alternative operator on shared flows).

 

flow가 collected되면서 시작하기전에, 주어진 action을 invoke시킨 flow를 리턴해주는 메소드.

the upstream flow가 시작하기전에 action은 called되므로, ShareFlow와 함께 사용되는 경우 onStart action의 collected 이후의 upstream flow 로부터의 emission의 collecting은 보장되지 않을 수 있다.

 

The receiver of the action is FlowCollector, so onStart can emit additional elements. For example:

flowOf("a", "b", "c")
    .onStart { emit("Begin") }
    .collect { println(it) } // prints Begin, a, b, c

 

12. onEach

onEach 구현

Flow<T>.onEach는 upStream Flow에서 emit되는 각각의 값들이 emited 되기전에 action으로 감싸져서 동작을 invoke한 flow가 return 되도록한다.

transformed flow

 

 

 

 

  • 네이버 블로그 공유
  • 네이버 밴드 공유
  • 페이스북 공유
  • 카카오스토리 공유