Android Dev/NowInAndroid / / 2022. 11. 3. 21:21

NowInAndroid(4) - SyncWorker/SyncUtilities

 

 

Sync의 목적은 local data와 remote source의 동기화이다.

아래 설명에 대한 코드들은 대부분 아래의 경로에 구현되어있다

 

1) SyncUtilities

 

아래의 공식 설명을 참고하면 각 스탭에서 SyncWorker가 동작하는 부분이 있는 것을 확인할 수 있다.

통합적인 설명은 다음장에서 하기로하고, 일단은 SyncWorker/SyncUtilities 코드를 분석해본다.

 

Application Basic Flow

 

example of ForYou screen

 

스탭 중간중간에 Sync를 위해 메소드가 호출되는 것을 확인할 수 있다. 이를 볼 것이다.

Step Description Code
1 On app startup, a WorkManager job to sync all repositories is enqueued. SyncInitializer.create
2 The initial news feed state is set to Loading, which causes the UI to show a loading spinner on the screen. Search for usages of NewsFeedUiState.Loading
3 WorkManager executes the sync job which calls OfflineFirstNewsRepository to start synchronizing data with the remote data source. SyncWorker.doWork
4 OfflineFirstNewsRepository calls RetrofitNiaNetwork to execute the actual API request using Retrofit. OfflineFirstNewsRepository.syncWith
5 RetrofitNiaNetwork calls the REST API on the remote server. RetrofitNiaNetwork.getNewsResources
6 RetrofitNiaNetwork receives the network response from the remote server. RetrofitNiaNetwork.getNewsResources
7 OfflineFirstNewsRepository syncs the remote data with NewsResourceDao by inserting, updating or deleting data in a local Room database. OfflineFirstNewsRepository.syncWith
8 When data changes in NewsResourceDao it is emitted into the news resources data stream (which is a Flow). NewsResourceDao.getNewsResourcesStream
9 OfflineFirstNewsRepository acts as an intermediate operator on this stream, transforming the incoming PopulatedNewsResource (a database model, internal to the data layer) to the public NewsResource model which is consumed by other layers. OfflineFirstNewsRepository.getNewsResourcesStream
10 When ForYouViewModel receives the news resources it updates the feed state to Success. ForYouScreen then uses the news resources in the state to render the screen.The screen shows the newly retrieved news resources (as long as the user has chosen at least one topic or author). Search for instances of NewsFeedUiState.Success

 

 

 

1> Synchronizer Interface Marker

/**
 * Interface marker for a class that manages synchronization between local data and a remote
 * source for a [Syncable].
 */
interface Synchronizer {
    suspend fun getChangeListVersions(): ChangeListVersions

    suspend fun updateChangeListVersions(update: ChangeListVersions.() -> ChangeListVersions)

    /**
     * Syntactic sugar to call [Syncable.syncWith] while omitting the synchronizer argument
     */
    suspend fun Syncable.sync() = this@sync.syncWith(this@Synchronizer)
}

/**
 * Interface marker for a class that is synchronized with a remote source. Syncing must not be
 * performed concurrently and it is the [Synchronizer]'s responsibility to ensure this.
 */
interface Syncable {
    /**
     * Synchronizes the local database backing the repository with the network.
     * Returns if the sync was successful or not.
     */
    suspend fun syncWith(synchronizer: Synchronizer): Boolean
}

 

1. 위의 주석에서 Interface marker는 아래를 참고하라.

https://en.wikipedia.org/wiki/Marker_interface_pattern

요약 : class가 해당 언어에서 명시적으로 제공하지 않는 메타데이터를 연결하기 위한 방법으로 사용된다고 쓰여 있다.

말이 어려운데.. 사용할 어떠한 Class에 대한 Tag를 붙였다고 생각하면 된다. 즉 상속하게 될 클래스를 Synchronizer라는 용도로 사용하겠다는 이름을 붙이겠다는 것이다. 따라서 어떤 타입체크등을 하게 되었을 때 이 클래스가 as Synchronizer 인지를 확인만 하겠다는 용도로 쓰인다.(본래는 메소드에 대한 선언이 없는 것이 예시로 되어있으나 여기서는 suspend fun 3개가 선언되어있음)

 

2. Syncable 인터페이스 마커는 remote source와의 동기화를 위한 클래스를 정의할때 사용한다는 주석이 달려있다.

3. syncWith 는 네트워크 데이터를 가져와서 로컬 데이터 베이스를 동기화하는 용도로 사용하기 위한 메소드를 정의하고, true/false로 sync의 성공여부를 나타낸다.

 

2> suspendRunCatching

/**
 * Attempts [block], returning a successful [Result] if it succeeds, otherwise a [Result.Failure]
 * taking care not to break structured concurrency
 */
private suspend fun <T> suspendRunCatching(block: suspend () -> T): Result<T> = try {
    Result.success(block())
} catch (cancellationException: CancellationException) {
    throw cancellationException
} catch (exception: Exception) {
    Log.i(
        "suspendRunCatching",
        "Failed to evaluate a suspendRunCatchingBlock. Returning failure Result",
        exception
    )
    Result.failure(exception)
}

4. 위 코드를 이해하기 위해, 먼저 runCatching에 대해 살펴 보면,

아래와 같이 어떤 block에 대하여  output으로 예상되는 타입 R이 정상적으로 나오게 되었을때는 success 그렇지 않다면 failure가 호출되도록 하는 메소드이다(참고 : https://uchun.dev/runCatching%EC%9D%84-%EC%9D%B4%EC%9A%A9%ED%95%9C-kotlin%EC%97%90%EC%84%9C-exception%EC%B2%98%EB%A6%AC-%EB%B0%A9%EB%B2%95/)

 

 

runCatching Class

 

Result Class

 

success/failure method in Result

 

 위 코드를 확인해보면 알겠지만, try-catch의 편의성을 보강한 메소드이며 예제로 아래와 같이 사용하여 이해해볼 수 있다.

@Throws(Exception::class)
fun getRandomFruit(): String {
    val fruitName = listOf(
        "Avocado", "Blueberries", null,
        "Clementine", "Durian", "Guava"
    ).shuffled().first()

    return when (fruitName) {
        "Guava" -> throw IllegalStateException("Out of stock")
        null, "" -> throw NullPointerException()
        else -> fruitName
    }
}

shuffled에 의해서 list의 순서가 랜덤하게 바뀌고 여기서 1번째 element를 뽑아오는 getRandomFruit 메소드를 통해 아래와 같이 runCatching을 실행시키면

val ff1 = runCatching {
    getRandomFruit()
}

ff1이라는 변수는 랜덤하게 success가 되는 경우와 failure로 떨어지는 경우로 나눠지게 된다.

실행결과 예

이를 개선하여 asynchronous 상황에서도 동작하게 만든 것이 suspendRunCatching 메소드이다.  다른것은 없고.. block 파라미터를 suspend로 사용하도록 한 것이 핵심이다.

 

3> Synchronizer extension function

/**
 * Utility function for syncing a repository with the network.
 * [versionReader] Reads the current version of the model that needs to be synced
 * [changeListFetcher] Fetches the change list for the model
 * [versionUpdater] Updates the [ChangeListVersions] after a successful sync
 * [modelDeleter] Deletes models by consuming the ids of the models that have been deleted.
 * [modelUpdater] Updates models by consuming the ids of the models that have changed.
 *
 * Note that the blocks defined above are never run concurrently, and the [Synchronizer]
 * implementation must guarantee this.
 */
suspend fun Synchronizer.changeListSync(
    versionReader: (ChangeListVersions) -> Int,
    changeListFetcher: suspend (Int) -> List<NetworkChangeList>,
    versionUpdater: ChangeListVersions.(Int) -> ChangeListVersions,
    modelDeleter: suspend (List<String>) -> Unit,
    modelUpdater: suspend (List<String>) -> Unit,
) = suspendRunCatching {
    // Fetch the change list since last sync (akin to a git fetch)
    val currentVersion = versionReader(getChangeListVersions())
    val changeList = changeListFetcher(currentVersion)
    if (changeList.isEmpty()) return@suspendRunCatching true

    val (deleted, updated) = changeList.partition(NetworkChangeList::isDelete)

    // Delete models that have been deleted server-side
    modelDeleter(deleted.map(NetworkChangeList::id))

    // Using the change list, pull down and save the changes (akin to a git pull)
    modelUpdater(updated.map(NetworkChangeList::id))

    // Update the last synced version (akin to updating local git HEAD)
    val latestVersion = changeList.last().changeListVersion
    updateChangeListVersions {
        versionUpdater(latestVersion)
    }
}.isSuccess

1. network에서 온 데이터와 room offline data(repository)와의 싱크를 맞추는 코드

2. currentVersion을 통해 changeList를 가져오고, 변화된 리스트에는 deleted, updated 2개가 존재하는데 이를 partition으로 나누어 modelDeletermodelUpdater에 각각 id로 구성된 list를 전달한다(이는 람다식에서 삭제와 업데이트에 대한 동작을 구현해주어야함)

3. latestVersion을 가져와서 versionUpdater를 통해 버전을 업데이트하고 interface Synchronizer의 updateChangeListVersions(update: ChangeListVersions.() -> ChangeListVersions)를 통해 version을 업데이트한다.

 

위의 과정을 runCatching하고 success 여부에 대한 boolean 값을 changeListSync 확장함수로 만든 것이 위의 코드임

(여기서 updateChangeListVersionsgetChangeListVersions는 interface Synchronizer의 메소드 선언이며 구현을 해주어야한다. 이는 SyncWorker class에서 구현되어있음)

 

 

4> syncWith override fun 구현 : OfflineFirstTopicsRepository, OfflineFirstAuthorsRepository, OfflineFirstNewsRepository

 

4-1. class OfflineFirstTopicsRepository에서의 구현 

override suspend fun syncWith(synchronizer: Synchronizer): Boolean =
    synchronizer.changeListSync(
        versionReader = ChangeListVersions::topicVersion,
        changeListFetcher = { currentVersion ->
            network.getTopicChangeList(after = currentVersion)
        },
        versionUpdater = { latestVersion ->
            copy(topicVersion = latestVersion)
        },
        modelDeleter = topicDao::deleteTopics,
        modelUpdater = { changedIds ->
            val networkTopics = network.getTopics(ids = changedIds)
            topicDao.upsertTopics(
                entities = networkTopics.map(NetworkTopic::asEntity)
            )
        }
    )

1. 람다식 내에 구현해야할 익명함수들을 확인 할 수 있다.

2. DataSource 인 val network를 통해 changeListFetcher가 구현되어 있고,  이를 받아와서 버전 체크 및 model에서의 Delete와 Updator를 topicDao의 query를 호출하여 구현되어있는 것을 확인 할 수 있다.

3. modelUpdater는 network로 호출한 데이터를 upsert로 db에 넣는 방식의 형태로 구현되어있다.

4. 4-2와 4-3에서도 마찬가지 형식임을 아래와 같이 확인할 수 있다.

 

 

4-2. class OfflineFirstAuthorsRepository에서의 구현 

override suspend fun syncWith(synchronizer: Synchronizer): Boolean =
    synchronizer.changeListSync(
        versionReader = ChangeListVersions::authorVersion,
        changeListFetcher = { currentVersion ->
            network.getAuthorChangeList(after = currentVersion)
        },
        versionUpdater = { latestVersion ->
            copy(authorVersion = latestVersion)
        },
        modelDeleter = authorDao::deleteAuthors,
        modelUpdater = { changedIds ->
            val networkAuthors = network.getAuthors(ids = changedIds)
            authorDao.upsertAuthors(
                entities = networkAuthors.map(NetworkAuthor::asEntity)
            )
        }
    )

 

4-3. OfflineFirstNewsRepository 에서의 구현

override suspend fun syncWith(synchronizer: Synchronizer) =
    synchronizer.changeListSync(
        versionReader = ChangeListVersions::newsResourceVersion,
        changeListFetcher = { currentVersion ->
            network.getNewsResourceChangeList(after = currentVersion)
        },
        versionUpdater = { latestVersion ->
            copy(newsResourceVersion = latestVersion)
        },
        modelDeleter = newsResourceDao::deleteNewsResources,
        modelUpdater = { changedIds ->
            val networkNewsResources = network.getNewsResources(ids = changedIds)

            // Order of invocation matters to satisfy id and foreign key constraints!

            topicDao.insertOrIgnoreTopics(
                topicEntities = networkNewsResources
                    .map(NetworkNewsResource::topicEntityShells)
                    .flatten()
                    .distinctBy(TopicEntity::id)
            )
            authorDao.insertOrIgnoreAuthors(
                authorEntities = networkNewsResources
                    .map(NetworkNewsResource::authorEntityShells)
                    .flatten()
                    .distinctBy(AuthorEntity::id)
            )
            newsResourceDao.upsertNewsResources(
                newsResourceEntities = networkNewsResources
                    .map(NetworkNewsResource::asEntity)
            )
            newsResourceDao.insertOrIgnoreTopicCrossRefEntities(
                newsResourceTopicCrossReferences = networkNewsResources
                    .map(NetworkNewsResource::topicCrossReferences)
                    .distinct()
                    .flatten()
            )
            newsResourceDao.insertOrIgnoreAuthorCrossRefEntities(
                newsResourceAuthorCrossReferences = networkNewsResources
                    .map(NetworkNewsResource::authorCrossReferences)
                    .distinct()
                    .flatten()
            )
        }
    )

 

 

2) SyncWorker

1. 아래는 SyncWorker full code이다.

1> 알아야할 것 : 에노테이션 @HiltWorker, @AssistedInject 의미 : https://witcheryoon.tistory.com/350

2> CoroutineWorker 

/**
 * Syncs the data layer by delegating to the appropriate repository instances with
 * sync functionality.
 */
@HiltWorker
class SyncWorker @AssistedInject constructor(
    @Assisted private val appContext: Context,
    @Assisted workerParams: WorkerParameters,
    private val niaPreferences: NiaPreferencesDataSource,
    private val topicRepository: TopicsRepository,
    private val newsRepository: NewsRepository,
    private val authorsRepository: AuthorsRepository,
    @Dispatcher(IO) private val ioDispatcher: CoroutineDispatcher,
) : CoroutineWorker(appContext, workerParams), Synchronizer {

    override suspend fun getForegroundInfo(): ForegroundInfo =
        appContext.syncForegroundInfo()

    override suspend fun doWork(): Result = withContext(ioDispatcher) {
        traceAsync("Sync", 0) {
            // First sync the repositories in parallel
            val syncedSuccessfully = awaitAll(
                async { topicRepository.sync() },
                async { authorsRepository.sync() },
                async { newsRepository.sync() },
            ).all { it }

            if (syncedSuccessfully) Result.success()
            else Result.retry()
        }
    }

    override suspend fun getChangeListVersions(): ChangeListVersions =
        niaPreferences.getChangeListVersions()

    override suspend fun updateChangeListVersions(
        update: ChangeListVersions.() -> ChangeListVersions
    ) = niaPreferences.updateChangeListVersion(update)

    companion object {
        /**
         * Expedited one time work to sync data on app startup
         */
        fun startUpSyncWork() = OneTimeWorkRequestBuilder<DelegatingWorker>()
            .setExpedited(OutOfQuotaPolicy.RUN_AS_NON_EXPEDITED_WORK_REQUEST)
            .setConstraints(SyncConstraints)
            .setInputData(SyncWorker::class.delegatedData())
            .build()
    }
}

3> SyncWorker는 DataSourceRepository가 핵심이고, context, workerParams, Dispatcher에 대한 파라미터를 갖고 있다. CoroutineWorker에 context와 workerparams을 전달하고, Sysnchronizing method을 위해 Synchronizer를 상속하고 있다. (WorkMananger / CoroutineWorker에 대한 설명  https://developer.android.com/topic/libraries/architecture/workmanager/advanced/threading이며 결론적으로 CorotuineWorker에 대한 사용은 doWork()를 override하여 구현하면된다)

 

4> traceAsync 를 통해 method name "Sync"로 trace를 추적한다(동작과는 무관하게 디버깅 용도로 태그 "Sync"를 넣기 위함인 것같은데.. 추후 다시 살펴봐야겠음)

 

// First sync the repositories in parallel
val syncedSuccessfully = awaitAll(
    async { topicRepository.sync() },
    async { authorsRepository.sync() },
    async { newsRepository.sync() },
).all { it }

 위 부분은  async 블록으로 Coroutine를 만든 뒤, 각 repository에서 위에서 언급한 SyncUtilities의 Synchronizer .sync()를 사용하여 각 레포지토리의 syncing을 해준다. 다시 말해, 모든 repository는 Syncable 를 상속하고 있고, 이에 대한 extension method인 sync를 통해, syncWith를 호출시켜 각 레포지토리에서 싱크를 위해 override된 changeListFetcher, versionUpdater, modelDeleter, modelUpdater를 execute하고 suspendRunCatching을 하도록한다.

 

위에 정의된 것들이 .all에 의해 successfully collected되면 Result.success()를 반환하고 아니면 Result.retry()를 traceAsync에 반환한다.

 

 

 

updateChangeListVersions 동작에 대한 부연 설명 : 

https://witcheryoon.tistory.com/345

 

  • 네이버 블로그 공유
  • 네이버 밴드 공유
  • 페이스북 공유
  • 카카오스토리 공유