Coroutines/Flow/Suspend fun Design Trial

Trial List
예제0 : Flow basic
예제1 : Flow basic2
예제2 : SharedFlow basic
예제3 : Coroutine basic
예제4 : Coroutine Flow
예제5 : Coroutine flow for Testing(updated at 2022-11-24)
예제6 : Suspend function안에 async methods를 parallelly operating (updated at 2022-12-5)
예제7 : Custom Deferred Method using CompletableDeferred at Suspend function (updated at 2022-12-6)

 

예제0 : Flow basic

MockServer에 n개의 RequestValue를 요청
RequestValue는 Random(1초 ~ 2초) 지연 후 응답이 도착
이를 처리하는데 1초 ~ 3초 지연 후 처리 완료 되는 coroutine flow를 작성

object MockServer {

    data class DataBox(
        val uid : UUID = UUID.randomUUID(),
        val durationTime : Long = 0L,
        val generatedNum : Int = 0
    )

    fun requestValue(requestNumber : Int) : Flow<DataBox> = flow {
        for(i in 1..requestNumber)
        {
            val durationTime = (1000..2000).random()
            val generatedNum = (1..20).random()
            delay(durationTime.toLong())
            emit(DataBox(generatedNum = generatedNum, durationTime = durationTime.toLong()))
        }
    }

}

실행하는 곳에서

        lifecycleScope.launch(Dispatchers.Main) {
            MockServer.requestValue(10).collect { aBox ->
                delay((1000..3000).random().toLong())
                println("${aBox.uid} / ${aBox.durationTime}: output -> ${aBox.generatedNum}")
            }
        }

결과 :

I/System.out: 41984c63-0427-48f9-a120-2c6ff8a20392 / 1329: output -> 12
I/System.out: 9a73d516-f906-436e-a4b0-592087668305 / 1867: output -> 3
I/System.out: bc56cf1e-85c9-4772-912d-4f23f9216576 / 1457: output -> 4
I/System.out: 9eaebb9e-d0d0-4dbf-887c-f3da4d1eedb4 / 1691: output -> 20
I/System.out: 0c2fa258-7938-4374-8536-0c3b13a78b10 / 1465: output -> 5
I/System.out: d8f10a2a-c1dd-4638-ae80-bb4a11ed9ff7 / 1137: output -> 11
I/System.out: 2a8ba605-b5b7-441d-afb4-3f6ba8f9298c / 1804: output -> 8
I/System.out: b8631642-be61-4dbc-983d-0fcb5fadbce4 / 1804: output -> 3
I/System.out: ccd4e01d-0607-45e8-b558-13efa6d45067 / 1729: output -> 20
I/System.out: 7bfd420e-cfb9-4a86-9309-57f08096ac72 / 1528: output -> 5

 

예제1 : Flow basic2

MockServer에 3개의 RequestValue를 병렬 요청(요청 후 응답 딜레이는 랜덤)
RequestValue response 3개를 조합하여 값을 printing하는 coroutine flow를 작성

lifecycleScope.launch(Dispatchers.Main) {
    repeat(3){ number ->
        val a = mutableListOf<MockServer.DataBox>()
        val x = launch {
            MockServer.requestValue(1).collect { aBox ->
                delay((1000..5000).random().toLong())
                println("x // ${aBox.uid} / ${aBox.durationTime}: output -> ${aBox.generatedNum}")
                a += aBox
            }
        }
        val y = launch {
            MockServer.requestValue(1).collect { aBox ->
                delay((2000..3000).random().toLong())
                println("y // ${aBox.uid} / ${aBox.durationTime}: output -> ${aBox.generatedNum}")
                a += aBox
            }
        }
        val z = launch {
            MockServer.requestValue(1).collect { aBox ->
                delay((1000..7000).random().toLong())
                println("z // ${aBox.uid} / ${aBox.durationTime}: output -> ${aBox.generatedNum}")
                a += aBox
            }
        }
        listOf(x,y,z).joinAll()
        val p = a.fold(0) { acc, dataBox ->
            acc + dataBox.generatedNum
        }
        println("trial num : $number : $p")
    }
}

결과

I/System.out: y // cb32aff6-6385-4b93-a401-9abf078d73f8 / 1148: output -> 8
I/System.out: z // 4ab8866f-8399-47e5-a4ae-9f0749f60c2a / 1928: output -> 2
I/System.out: x // 245800af-792c-4027-9ed3-d847d4f2f2bb / 1026: output -> 9
I/System.out: trial num : 0 : 19
I/System.out: x // dfa1ecf8-e08b-4167-9e36-398b9c82b7b2 / 1050: output -> 9
I/System.out: y // dff96f61-09b3-4407-93a9-98864a7966a3 / 1450: output -> 16
I/System.out: z // 77bddfcb-2f24-4e2f-af4d-b3233492c412 / 1793: output -> 11
I/System.out: trial num : 1 : 36
I/System.out: y // 627f6131-bf4d-42e2-ad08-1a691b4c75a5 / 1680: output -> 1
I/System.out: x // 17458421-8e57-472d-9f48-9275da22fa11 / 1450: output -> 10
I/System.out: z // 559dbfda-7838-46d7-a467-b91ecd3beb83 / 1450: output -> 10
I/System.out: trial num : 2 : 21

 

예제2 : SharedFlow basic

MockServer에 n개의 RequestValue를 요청(요청 후 응답 딜레이는 랜덤)
RequestValue response 를 subscribing하는 2개의 collector는 시간차를 가지고 collecting함

옵션1 : delayed collector가 요청값을 collecting하는 시점에 모두 printing하는 케이스

옵션2 : delayed collector가 요청값을 collecting하는 시점부터 printing하는 케이스

옵션1 케이스

 lifecycleScope.launch(Dispatchers.Main) {

            val requestNum = 5
            val coldFlow: Flow<MockServer.DataBox> = MockServer.requestValue(requestNum)
            val hotFlow: SharedFlow<MockServer.DataBox> = coldFlow.shareIn(
                scope = CoroutineScope(Dispatchers.Main),
                started = SharingStarted.Eagerly,
                replay = requestNum)

            launch {
                hotFlow.collect { aBox ->
//                    delay((1000..3000).random().toLong())
                    println("hotFlowA : ${aBox.uid} / ${aBox.durationTime}: output -> ${aBox.generatedNum}")
                }
            }

            launch {
                delay(7000)
                hotFlow.collect{ aBox ->
                    println("hotFlowB : ${aBox.uid} / ${aBox.durationTime}: output -> ${aBox.generatedNum}")
                }
            }
        }

결과 :

71,48,57이 나올때까지 hotFlowB는 collecting하고 있지 않다가 7000ms가 지난 이후 한번에 71,48,57을 collecting하고 emit하는 값을 subscribe하는 상황

I/System.out: hotFlowA : 41308695-0e6a-440e-9f1f-4863d279d543 / 1493: output -> 71
I/System.out: hotFlowA : c7416988-4f59-4edb-ab28-ff9e0499af85 / 1428: output -> 48
I/System.out: hotFlowA : e1b2d051-e3de-4e2b-831a-66b8543e003b / 1299: output -> 57
I/System.out: hotFlowB : 41308695-0e6a-440e-9f1f-4863d279d543 / 1493: output -> 71
I/System.out: hotFlowB : c7416988-4f59-4edb-ab28-ff9e0499af85 / 1428: output -> 48
I/System.out: hotFlowB : e1b2d051-e3de-4e2b-831a-66b8543e003b / 1299: output -> 57
I/System.out: hotFlowA : 01a15220-89c5-4d38-bb50-49e837f5c2bc / 1077: output -> 16
I/System.out: hotFlowB : 01a15220-89c5-4d38-bb50-49e837f5c2bc / 1077: output -> 16
I/System.out: hotFlowA : a2fe617c-983a-4d99-90a0-7c26794e2672 / 1561: output -> 34
I/System.out: hotFlowB : a2fe617c-983a-4d99-90a0-7c26794e2672 / 1561: output -> 34

옵션2 케이스

replay가 없다면(replay = 0) cached되지 않으므로 아래와 같은 결과가 나온다

hotFlowB는 앞의 41,58에 대한 값 없이, 7000ms 지연이후의 가장 최근의 값 64 부터 subscribe한다

I/System.out: hotFlowA : b9d8ffe2-1cce-478d-ade2-cd4cf1f372d7 / 1993: output -> 41
I/System.out: hotFlowA : 51f9a6c5-661f-4d4e-bc11-28a9df013b25 / 1578: output -> 58
I/System.out: hotFlowA : c0402535-f51d-49f9-ba84-fc1419f504e8 / 1721: output -> 64
I/System.out: hotFlowB : c0402535-f51d-49f9-ba84-fc1419f504e8 / 1721: output -> 64
I/System.out: hotFlowA : 724e53cd-d9bd-4425-9487-76c5d9c14d5c / 1528: output -> 5
I/System.out: hotFlowB : 724e53cd-d9bd-4425-9487-76c5d9c14d5c / 1528: output -> 5
I/System.out: hotFlowA : ce4bdd88-4733-46ce-9b37-7c90a3aafaf8 / 1971: output -> 51
I/System.out: hotFlowB : ce4bdd88-4733-46ce-9b37-7c90a3aafaf8 / 1971: output -> 51

 

예제3 : Coroutine basic

elements 8개가 각 process delay를 가진채 task를 수행하며, 모든 일이 끝났을 때 "함수종료"를 print

suspend fun doSomething() = coroutineScope {
val listShower = listOf<Int>(1,2,3,4,5,6,7,8)
val jobs = listShower.map{
println("$it 의 동작")
launch {
withContext(Dispatchers.IO) {
val ranNum = kotlin.math.floor(Math.random() * 10000)
delay(ranNum.toLong())
println("$it 의 $ranNum 처리 완료")
}
}
}
jobs.forEach { it.join() }
println("함수 종료")
}

val job = GlobalScope.launch {
doSomething()
}

예제4. Coroutine launch in Android

 viewModelScope.launch(/*Dispatchers.Main*/) {
                LoginKit().getEssentialData(userViewmodel = userViewModel) { success ->
                    if(success){ // 성공
//                            val intent = Intent(context, M1Mainstream::class.java)
//                            context.startActivity(intent)
                        (context as Activity).finish()
                    }else{ // 실패
                        Thread(kotlinx.coroutines.Runnable {
                            Toast.makeText(
                                context,
                                "기본 정보를 설정하는데 실패했습니다. 다시 로그인 해주세요",
                                Toast.LENGTH_SHORT
                            ).show()
                        })
                    }
                }
            }
suspend fun getEssentialData(userViewmodel : UserViewModel, success : (Boolean) -> Unit) = withContext(Dispatchers.IO) {
    val userBasic = async { getUserBasic(userViewmodel) }
    launch { updateImg(userViewmodel) }.join()
    when(val a = userBasic.await()){
        is FirebaseDataResult.Success<UserPublic> ->{
            userViewmodel.setUserBasicData(a.data)
            success(true)
        }
        is FirebaseDataResult.Error ->{
            success(false)
        }
    }
}

 

예제4 : Coroutine Flow

아래 같이 List 형태로 구성된 요소들이 각각 query를 던진후 랜덤한 딜레이 후 리턴 받은 반환값을 ArrayList에 넣어서 emit하는 Flow

1. 호출 부분 : Compose에서 CoroutineContext로 들어가기 위해 LaunchedEffect를 사용

LaunchedEffect(key1 = delayRunner) {
    firebaseNicknameManagerFlow(fidList = keyList, refresh = false).collect{
        val vv = 1
    }
}

2.  Suspend fun 구현. 해당 부분은 위 그림에서 Query를 호출하는 부분이다. map에 의해 전달 받은 모든 요소가 parallel하게 firestoreUserQuery를 호출하고 이에 대한 반환 값을 nickArrayList에서 받아서 add하게 된다.

suspend fun firebaseNicknameManagerFlow(fidList : List<String>, refresh : Boolean = false) : Flow<ArrayList<String>> = flow {
    val nickArrayList : ArrayList<String> = ArrayList()
    fidList.map{ aFid ->
        if(nicknameList[aFid] == null || nicknameList[aFid] == "" || refresh) //리프레시 일 경우에도 이 조건문으로 들어온다
        {
            nickArrayList.add(firestoreUserQuery(aFid,"nickname"))
        }
        else //기존 닉네임을 사용
        {
            nickArrayList.add(nicknameList[aFid].toString())
        }
    }

    emit(nickArrayList)
}

3. 딜레이가 존재하는 Query 함수의 실제 구현 예제

userDoc이 올때까지 await로 기다린 후 값을 받게 되면 return하게 된다.

suspend fun firestoreUserQuery(fid : String, target : String):String {

    val userDoc = FirebaseFirestore.getInstance().collection("user_basic").document(fid).get().await()

    return if (userDoc.data != null)
    {
         userDoc.data!![target].toString()
    }
    else "유져없음"

}

 

예제5 : Coroutine flow for Testing(updated at 2022-11-24)

Unit Test를 하게 될 경우 collect 를 하게 되면 코루틴이 종료되지 않고, collecting을 대기하고 있는 경우가 생길 수 있다. 해당 경우는 다음과 같이 처리한다.

 

1. first() method를 이용하여 flow cancellation을 유도한다.

val gg: List<NetworkBulletin> = dataSource.getBulletinByTimestamp(1507593600L*3).first()

 

2.  parralel launch를 두고 일정 polling time을 가진 뒤 job을 cancel() call을 하여 coroutine을 종료한다.

val dataSourceJob: Job =
    launch {  dataSource.getBulletinByTimestamp(1507593600L*3)

        .collect {
            getResult = it
        }
    }
launch {
    delay(5000L)
    dataSourceJob.cancel()
}

 

예제6 : Suspend function안에 async methods를 parallelly operating (updated at 2022-12-5)

CoroutineScope 블록 안에  다수의 launch 블록을 열고서 각각의 async method를 넣어주면된다.
혹은 async { } 블록을 열고 deferred를 관리하여 순서를 조정할 수도 있다

예제는 아래와 같음.

suspend operator fun invoke(fid : String) =
    coroutineScope {
        launch {
            //1. sync Firebase to DB + attaching ChatRoom Snapshot Listener
            chatRoomRepository.syncChatRoomtoDb(fid = fid)
        }
        launch {
            //2. load ChatRoomList from DB
            chatRoomRepository.loadChatRoomfromDb().collect { chatRoomList ->
                chatRoomList.forEach { aChatRoom ->
                    //3. attaching ChatMsgs Snapshot Listener
                    chatRepository.syncFirebaseToRoomDb(432000_000, aChatRoom.chatRoomId).collect()
                }
            }
        }
}

 

예제7 : Custom Deferred Method using CompletableDeferred at Suspend function (updated at 2022-12-6)

 

Suspend function 안에 코드 실행이 suspending 이 되는 경우 이를 CompletableDeferred를 사용하여 awaiting 시킬 수 있다.

1. 먼저 completableDeferred를 만들어 주고 await를 호출하면, 코드 실행을 pause 시킬 수 있다.

2. suspending이 해제 되는 시점에 맞추어(예를 들어 call back response가 오는 시점) deferred를 complete 시켜준다.

3. await() 이후의 코드가 다시 re-executing된다.

CompletableDeferred를 사용한 suspending function control

 

예시코드 : 

아래 예제는,

callback인 addOnSuccessListener lambda 블록 에 initialized를 complete시켜주어 await에 의한 suspending을 해제 시켜준 상황이다

override suspend fun makeChatRoom(
    networkChatRoom: NetworkChatRoom,
    response : (String) -> Unit)
{
    val initialized = CompletableDeferred<Unit>()
    val eventsDocument = firebase.collection("chatRoom").document()
    networkChatRoom.chatRoomId = eventsDocument.id

    eventsDocument.set(networkChatRoom, SetOptions.merge())
        .addOnSuccessListener {
            response("success")
            initialized.complete(Unit)
        }
        .addOnFailureListener {
            response("failure")
            initialized.complete(Unit)
        }.await()

    initialized.await()

}
  • 네이버 블로그 공유
  • 네이버 밴드 공유
  • 페이스북 공유
  • 카카오스토리 공유