【Android】kotlinx-coroutines-rx2の使い方!RxJava → Kotlin Coroutinesへの置き換え
この記事からわかること
- Android Studio/KotlinでRxJavaの使い方
- Kotlin Coroutinesへの置き換え
- kotlinx-coroutines-rx2ライブラリの実装する方法
- awaitやasFlowメソッドの役割
index
[open]
\ アプリをリリースしました /
友達や家族の誕生日をメモ!通知も届く-みんなの誕生日-
posted withアプリーチ
環境
- Android Studio:Flamingo
- Kotlin:1.8.20
- RxJava:2系
RxJava → Kotlin Coroutinesへの置き換え
RxJavaを使用して実装されたコードをKotlin Coroutinesへ置き換えたい場合に活用できるのが「kotlinx-coroutines-rx2」ライブラリです。RxJavaのコードはやはり独特で直感的でない分、学習コストも高いのが難点です。Kotlin Coroutinesなら比較的見通しの良いコードで非同期処理を実装できるので最近はKotlin Coroutinesを利用する案件も増えてきたように思います。
RxJava
RxJavaはReactiveX(Reactive Extensions)と呼ばれるライブラリのJava版。「ストリーム」と呼ばれるデータの流れを基本として操作することができる。
Kotlin Coroutines
Kotlin Coroutinesは非同期処理を実装できる公式ライブラリ。「コルーチン」という単位で非同期処理が管理、操作することができる。
kotlinx-coroutines-rx2
公式リファレンス:kotlinx-coroutines-rx2
「kotlinx-coroutines-rx2」はKotlin CoroutinesとRxJava2を相互運用できる機能を提供するライブラリです。具体的にはRxJavaのObservableやFlowableなどのRx型をKotlin Coroutinesのsuspend関数やフローに変換したり、その逆の変換をすることができます。
RxJava → Kotlin Coroutines
ここからは「RxJava → Kotlin Coroutines」に変換できる機能を紹介していきます。
awaitメソッド:Completable → suspend
await
メソッドはRxのCompletableをCoroutinesに変更するためのメソッドです。Completable
の完了またはエラーの通知が来るまで待機します。
例えば以下のようにCompletable
を返すgetCompletable
が定義してあったとします。
private fun getCompletable(success: Boolean): Completable =
Completable.create { emitter ->
if (success) {
emitter.onComplete()
} else {
emitter.onError(Exception("Completable error occurred"))
}
}
getCompletable
をsuspend関数にするためにawait
を使用することでCompletable
の結果を待機して結果を返したりすることができます。onError
が流れた際はawait
が例外をスローするのでtry〜catch
文で捕捉できます。
suspend fun runCompletable(): Boolean {
return withContext(Dispatchers.IO) {
try {
getCompletable(true).await()
println("Completable completed successfully")
true
} catch (e: Exception) {
e.printStackTrace()
println("Error occurred: ${e.message}")
false
}
}
}
呼び出す時は以下のような感じになります。
GlobalScope.launch {
val result = runCompletable()
println("result: $result")
}
awaitFirst:Observable → suspend
Observable
をCoroutinesに変更したい場合はawaitFirst
などいくつか用意されています。
awaitFirst
最初の要素を取得するまで待機。要素が発行されない場合はエラー
awaitLast
最後の要素を取得するまで待機。要素が発行されない場合はエラー
awaitSingle
要素が1つ流れた後に完了を取得するまで待機。要素が発行されないまたは複数の要素が流れる場合はエラー
awaitFirstOrNull
最初の要素を取得するまで待機。要素が発行されない場合はnullを返す
awaitFirstOrElse
最初の要素を取得するまで待機。要素が発行されないデフォルト値(ラムダ式を使用可能)を返す
awaitFirstOrDefault
最初の要素を取得するまで待機。要素が発行されない場合はデフォルト値を返す
// Rx系
private fun getObservable(): Observable<String> =
Observable.create { emitter ->
emitter.onNext("Hello")
emitter.onNext("World")
if (true) {
emitter.onComplete()
} else {
emitter.onError(Exception("ERROR") )
}
}
// Rx系 → Coroutines
suspend fun runObservable(): String {
return withContext(Dispatchers.IO) {
try {
val observable = getObservable()
observable.awaitFirst()
} catch (e: Exception) {
e.printStackTrace()
"Error occurred"
}
}
}
asFlow:Observable → Flow
Observable
をCoroutinesのFlow
に変換したい場合はasFlow
メソッドを使用します。
GlobalScope.launch {
getObservable().asFlow().collect { value ->
println(value) // "Hello" と "World" が順に出力される
}
}
まだまだ勉強中ですので間違っている点や至らぬ点がありましたら教えていただけると助かります。
ご覧いただきありがとうございました。