Trial List
예제0 : Flow basic
예제1 : Flow basic2
예제2 : SharedFlow basic
예제3 : Coroutine basic
예제4 : Coroutine Flow
예제5 : Coroutine flow for Testing(updated at 2022-11-24)
예제6 : Suspend function안에 async methods를 parallelly operating (updated at 2022-12-5)
예제7 : Custom Deferred Method using CompletableDeferred at Suspend function (updated at 2022-12-6)
예제0 : Flow basic
MockServer에 n개의 RequestValue를 요청
RequestValue는 Random(1초 ~ 2초) 지연 후 응답이 도착
이를 처리하는데 1초 ~ 3초 지연 후 처리 완료 되는 coroutine flow를 작성
object MockServer {
data class DataBox(
val uid : UUID = UUID.randomUUID(),
val durationTime : Long = 0L,
val generatedNum : Int = 0
)
fun requestValue(requestNumber : Int) : Flow<DataBox> = flow {
for(i in 1..requestNumber)
{
val durationTime = (1000..2000).random()
val generatedNum = (1..20).random()
delay(durationTime.toLong())
emit(DataBox(generatedNum = generatedNum, durationTime = durationTime.toLong()))
}
}
}
실행하는 곳에서
lifecycleScope.launch(Dispatchers.Main) {
MockServer.requestValue(10).collect { aBox ->
delay((1000..3000).random().toLong())
println("${aBox.uid} / ${aBox.durationTime}: output -> ${aBox.generatedNum}")
}
}
결과 :
I/System.out: 41984c63-0427-48f9-a120-2c6ff8a20392 / 1329: output -> 12
I/System.out: 9a73d516-f906-436e-a4b0-592087668305 / 1867: output -> 3
I/System.out: bc56cf1e-85c9-4772-912d-4f23f9216576 / 1457: output -> 4
I/System.out: 9eaebb9e-d0d0-4dbf-887c-f3da4d1eedb4 / 1691: output -> 20
I/System.out: 0c2fa258-7938-4374-8536-0c3b13a78b10 / 1465: output -> 5
I/System.out: d8f10a2a-c1dd-4638-ae80-bb4a11ed9ff7 / 1137: output -> 11
I/System.out: 2a8ba605-b5b7-441d-afb4-3f6ba8f9298c / 1804: output -> 8
I/System.out: b8631642-be61-4dbc-983d-0fcb5fadbce4 / 1804: output -> 3
I/System.out: ccd4e01d-0607-45e8-b558-13efa6d45067 / 1729: output -> 20
I/System.out: 7bfd420e-cfb9-4a86-9309-57f08096ac72 / 1528: output -> 5
예제1 : Flow basic2
MockServer에 3개의 RequestValue를 병렬 요청(요청 후 응답 딜레이는 랜덤)
RequestValue response 3개를 조합하여 값을 printing하는 coroutine flow를 작성
lifecycleScope.launch(Dispatchers.Main) {
repeat(3){ number ->
val a = mutableListOf<MockServer.DataBox>()
val x = launch {
MockServer.requestValue(1).collect { aBox ->
delay((1000..5000).random().toLong())
println("x // ${aBox.uid} / ${aBox.durationTime}: output -> ${aBox.generatedNum}")
a += aBox
}
}
val y = launch {
MockServer.requestValue(1).collect { aBox ->
delay((2000..3000).random().toLong())
println("y // ${aBox.uid} / ${aBox.durationTime}: output -> ${aBox.generatedNum}")
a += aBox
}
}
val z = launch {
MockServer.requestValue(1).collect { aBox ->
delay((1000..7000).random().toLong())
println("z // ${aBox.uid} / ${aBox.durationTime}: output -> ${aBox.generatedNum}")
a += aBox
}
}
listOf(x,y,z).joinAll()
val p = a.fold(0) { acc, dataBox ->
acc + dataBox.generatedNum
}
println("trial num : $number : $p")
}
}
결과
I/System.out: y // cb32aff6-6385-4b93-a401-9abf078d73f8 / 1148: output -> 8
I/System.out: z // 4ab8866f-8399-47e5-a4ae-9f0749f60c2a / 1928: output -> 2
I/System.out: x // 245800af-792c-4027-9ed3-d847d4f2f2bb / 1026: output -> 9
I/System.out: trial num : 0 : 19
I/System.out: x // dfa1ecf8-e08b-4167-9e36-398b9c82b7b2 / 1050: output -> 9
I/System.out: y // dff96f61-09b3-4407-93a9-98864a7966a3 / 1450: output -> 16
I/System.out: z // 77bddfcb-2f24-4e2f-af4d-b3233492c412 / 1793: output -> 11
I/System.out: trial num : 1 : 36
I/System.out: y // 627f6131-bf4d-42e2-ad08-1a691b4c75a5 / 1680: output -> 1
I/System.out: x // 17458421-8e57-472d-9f48-9275da22fa11 / 1450: output -> 10
I/System.out: z // 559dbfda-7838-46d7-a467-b91ecd3beb83 / 1450: output -> 10
I/System.out: trial num : 2 : 21
예제2 : SharedFlow basic
MockServer에 n개의 RequestValue를 요청(요청 후 응답 딜레이는 랜덤)
RequestValue response 를 subscribing하는 2개의 collector는 시간차를 가지고 collecting함
옵션1 : delayed collector가 요청값을 collecting하는 시점에 모두 printing하는 케이스
옵션2 : delayed collector가 요청값을 collecting하는 시점부터 printing하는 케이스
옵션1 케이스
lifecycleScope.launch(Dispatchers.Main) {
val requestNum = 5
val coldFlow: Flow<MockServer.DataBox> = MockServer.requestValue(requestNum)
val hotFlow: SharedFlow<MockServer.DataBox> = coldFlow.shareIn(
scope = CoroutineScope(Dispatchers.Main),
started = SharingStarted.Eagerly,
replay = requestNum)
launch {
hotFlow.collect { aBox ->
// delay((1000..3000).random().toLong())
println("hotFlowA : ${aBox.uid} / ${aBox.durationTime}: output -> ${aBox.generatedNum}")
}
}
launch {
delay(7000)
hotFlow.collect{ aBox ->
println("hotFlowB : ${aBox.uid} / ${aBox.durationTime}: output -> ${aBox.generatedNum}")
}
}
}
결과 :
71,48,57이 나올때까지 hotFlowB는 collecting하고 있지 않다가 7000ms가 지난 이후 한번에 71,48,57을 collecting하고 emit하는 값을 subscribe하는 상황
I/System.out: hotFlowA : 41308695-0e6a-440e-9f1f-4863d279d543 / 1493: output -> 71
I/System.out: hotFlowA : c7416988-4f59-4edb-ab28-ff9e0499af85 / 1428: output -> 48
I/System.out: hotFlowA : e1b2d051-e3de-4e2b-831a-66b8543e003b / 1299: output -> 57
I/System.out: hotFlowB : 41308695-0e6a-440e-9f1f-4863d279d543 / 1493: output -> 71
I/System.out: hotFlowB : c7416988-4f59-4edb-ab28-ff9e0499af85 / 1428: output -> 48
I/System.out: hotFlowB : e1b2d051-e3de-4e2b-831a-66b8543e003b / 1299: output -> 57
I/System.out: hotFlowA : 01a15220-89c5-4d38-bb50-49e837f5c2bc / 1077: output -> 16
I/System.out: hotFlowB : 01a15220-89c5-4d38-bb50-49e837f5c2bc / 1077: output -> 16
I/System.out: hotFlowA : a2fe617c-983a-4d99-90a0-7c26794e2672 / 1561: output -> 34
I/System.out: hotFlowB : a2fe617c-983a-4d99-90a0-7c26794e2672 / 1561: output -> 34
옵션2 케이스
replay가 없다면(replay = 0) cached되지 않으므로 아래와 같은 결과가 나온다
hotFlowB는 앞의 41,58에 대한 값 없이, 7000ms 지연이후의 가장 최근의 값 64 부터 subscribe한다
I/System.out: hotFlowA : b9d8ffe2-1cce-478d-ade2-cd4cf1f372d7 / 1993: output -> 41
I/System.out: hotFlowA : 51f9a6c5-661f-4d4e-bc11-28a9df013b25 / 1578: output -> 58
I/System.out: hotFlowA : c0402535-f51d-49f9-ba84-fc1419f504e8 / 1721: output -> 64
I/System.out: hotFlowB : c0402535-f51d-49f9-ba84-fc1419f504e8 / 1721: output -> 64
I/System.out: hotFlowA : 724e53cd-d9bd-4425-9487-76c5d9c14d5c / 1528: output -> 5
I/System.out: hotFlowB : 724e53cd-d9bd-4425-9487-76c5d9c14d5c / 1528: output -> 5
I/System.out: hotFlowA : ce4bdd88-4733-46ce-9b37-7c90a3aafaf8 / 1971: output -> 51
I/System.out: hotFlowB : ce4bdd88-4733-46ce-9b37-7c90a3aafaf8 / 1971: output -> 51
예제3 : Coroutine basic
elements 8개가 각 process delay를 가진채 task를 수행하며, 모든 일이 끝났을 때 "함수종료"를 print
suspend fun doSomething() = coroutineScope {
val listShower = listOf<Int>(1,2,3,4,5,6,7,8)
val jobs = listShower.map{
println("$it 의 동작")
launch {
withContext(Dispatchers.IO) {
val ranNum = kotlin.math.floor(Math.random() * 10000)
delay(ranNum.toLong())
println("$it 의 $ranNum 처리 완료")
}
}
}
jobs.forEach { it.join() }
println("함수 종료")
}
val job = GlobalScope.launch {
doSomething()
}
예제4. Coroutine launch in Android
viewModelScope.launch(/*Dispatchers.Main*/) {
LoginKit().getEssentialData(userViewmodel = userViewModel) { success ->
if(success){ // 성공
// val intent = Intent(context, M1Mainstream::class.java)
// context.startActivity(intent)
(context as Activity).finish()
}else{ // 실패
Thread(kotlinx.coroutines.Runnable {
Toast.makeText(
context,
"기본 정보를 설정하는데 실패했습니다. 다시 로그인 해주세요",
Toast.LENGTH_SHORT
).show()
})
}
}
}
suspend fun getEssentialData(userViewmodel : UserViewModel, success : (Boolean) -> Unit) = withContext(Dispatchers.IO) {
val userBasic = async { getUserBasic(userViewmodel) }
launch { updateImg(userViewmodel) }.join()
when(val a = userBasic.await()){
is FirebaseDataResult.Success<UserPublic> ->{
userViewmodel.setUserBasicData(a.data)
success(true)
}
is FirebaseDataResult.Error ->{
success(false)
}
}
}
예제4 : Coroutine Flow
아래 같이 List 형태로 구성된 요소들이 각각 query를 던진후 랜덤한 딜레이 후 리턴 받은 반환값을 ArrayList에 넣어서 emit하는 Flow
1. 호출 부분 : Compose에서 CoroutineContext로 들어가기 위해 LaunchedEffect를 사용
LaunchedEffect(key1 = delayRunner) {
firebaseNicknameManagerFlow(fidList = keyList, refresh = false).collect{
val vv = 1
}
}
2. Suspend fun 구현. 해당 부분은 위 그림에서 Query를 호출하는 부분이다. map에 의해 전달 받은 모든 요소가 parallel하게 firestoreUserQuery를 호출하고 이에 대한 반환 값을 nickArrayList에서 받아서 add하게 된다.
suspend fun firebaseNicknameManagerFlow(fidList : List<String>, refresh : Boolean = false) : Flow<ArrayList<String>> = flow {
val nickArrayList : ArrayList<String> = ArrayList()
fidList.map{ aFid ->
if(nicknameList[aFid] == null || nicknameList[aFid] == "" || refresh) //리프레시 일 경우에도 이 조건문으로 들어온다
{
nickArrayList.add(firestoreUserQuery(aFid,"nickname"))
}
else //기존 닉네임을 사용
{
nickArrayList.add(nicknameList[aFid].toString())
}
}
emit(nickArrayList)
}
3. 딜레이가 존재하는 Query 함수의 실제 구현 예제
userDoc이 올때까지 await로 기다린 후 값을 받게 되면 return하게 된다.
suspend fun firestoreUserQuery(fid : String, target : String):String {
val userDoc = FirebaseFirestore.getInstance().collection("user_basic").document(fid).get().await()
return if (userDoc.data != null)
{
userDoc.data!![target].toString()
}
else "유져없음"
}
예제5 : Coroutine flow for Testing(updated at 2022-11-24)
Unit Test를 하게 될 경우 collect 를 하게 되면 코루틴이 종료되지 않고, collecting을 대기하고 있는 경우가 생길 수 있다. 해당 경우는 다음과 같이 처리한다.
1. first() method를 이용하여 flow cancellation을 유도한다.
val gg: List<NetworkBulletin> = dataSource.getBulletinByTimestamp(1507593600L*3).first()
2. parralel launch를 두고 일정 polling time을 가진 뒤 job을 cancel() call을 하여 coroutine을 종료한다.
val dataSourceJob: Job =
launch { dataSource.getBulletinByTimestamp(1507593600L*3)
.collect {
getResult = it
}
}
launch {
delay(5000L)
dataSourceJob.cancel()
}
예제6 : Suspend function안에 async methods를 parallelly operating (updated at 2022-12-5)
CoroutineScope 블록 안에 다수의 launch 블록을 열고서 각각의 async method를 넣어주면된다.
혹은 async { } 블록을 열고 deferred를 관리하여 순서를 조정할 수도 있다
예제는 아래와 같음.
suspend operator fun invoke(fid : String) =
coroutineScope {
launch {
//1. sync Firebase to DB + attaching ChatRoom Snapshot Listener
chatRoomRepository.syncChatRoomtoDb(fid = fid)
}
launch {
//2. load ChatRoomList from DB
chatRoomRepository.loadChatRoomfromDb().collect { chatRoomList ->
chatRoomList.forEach { aChatRoom ->
//3. attaching ChatMsgs Snapshot Listener
chatRepository.syncFirebaseToRoomDb(432000_000, aChatRoom.chatRoomId).collect()
}
}
}
}
예제7 : Custom Deferred Method using CompletableDeferred at Suspend function (updated at 2022-12-6)
Suspend function 안에 코드 실행이 suspending 이 되는 경우 이를 CompletableDeferred를 사용하여 awaiting 시킬 수 있다.
1. 먼저 completableDeferred를 만들어 주고 await를 호출하면, 코드 실행을 pause 시킬 수 있다.
2. suspending이 해제 되는 시점에 맞추어(예를 들어 call back response가 오는 시점) deferred를 complete 시켜준다.
3. await() 이후의 코드가 다시 re-executing된다.
예시코드 :
아래 예제는,
callback인 addOnSuccessListener lambda 블록 에 initialized를 complete시켜주어 await에 의한 suspending을 해제 시켜준 상황이다
override suspend fun makeChatRoom(
networkChatRoom: NetworkChatRoom,
response : (String) -> Unit)
{
val initialized = CompletableDeferred<Unit>()
val eventsDocument = firebase.collection("chatRoom").document()
networkChatRoom.chatRoomId = eventsDocument.id
eventsDocument.set(networkChatRoom, SetOptions.merge())
.addOnSuccessListener {
response("success")
initialized.complete(Unit)
}
.addOnFailureListener {
response("failure")
initialized.complete(Unit)
}.await()
initialized.await()
}
'Programming Theory > 코루틴(Coruotine)' 카테고리의 다른 글
Coroutines guide - android coroutine API (0) | 2022.10.30 |
---|---|
Coroutines guide, kotlinx-coroutines-core / flow (0) | 2022.02.01 |
Coroutines guide + Flow 정리(2022) (0) | 2022.01.22 |
kotlin Coroutine 정리(4, Coroutine Context and Dispatchers) (0) | 2020.09.22 |
kotlin Coroutine 정리(3, Composing Suspending Functions) (0) | 2020.09.21 |