【Android】kotlinx-coroutines-rx2の使い方!RxJava → Kotlin Coroutinesへの置き換え

【Android】kotlinx-coroutines-rx2の使い方!RxJava → Kotlin Coroutinesへの置き換え

この記事からわかること

  • Android Studio/KotlinRxJava使い方
  • Kotlin Coroutinesへの置き換え
  • kotlinx-coroutines-rx2ライブラリ実装する方法
  • awaitasFlowメソッド役割

index

[open]

\ アプリをリリースしました /

みんなの誕生日

友達や家族の誕生日をメモ!通知も届く-みんなの誕生日-

posted withアプリーチ

環境

RxJava → Kotlin Coroutinesへの置き換え

RxJavaを使用して実装されたコードをKotlin Coroutinesへ置き換えたい場合に活用できるのが「kotlinx-coroutines-rx2」ライブラリです。RxJavaのコードはやはり独特で直感的でない分、学習コストも高いのが難点です。Kotlin Coroutinesなら比較的見通しの良いコードで非同期処理を実装できるので最近はKotlin Coroutinesを利用する案件も増えてきたように思います。

RxJava

RxJavaはReactiveX(Reactive Extensions)と呼ばれるライブラリのJava版。「ストリーム」と呼ばれるデータの流れを基本として操作することができる。

Kotlin Coroutines

Kotlin Coroutinesは非同期処理を実装できる公式ライブラリ。「コルーチン」という単位で非同期処理が管理、操作することができる。

kotlinx-coroutines-rx2

公式リファレンス:kotlinx-coroutines-rx2

kotlinx-coroutines-rx2」はKotlin CoroutinesとRxJava2を相互運用できる機能を提供するライブラリです。具体的にはRxJavaのObservableやFlowableなどのRx型をKotlin Coroutinesのsuspend関数やフローに変換したり、その逆の変換をすることができます。

RxJava → Kotlin Coroutines

ここからは「RxJava → Kotlin Coroutines」に変換できる機能を紹介していきます。

awaitメソッド:Completable → suspend

awaitメソッドはRxのCompletableをCoroutinesに変更するためのメソッドです。Completable完了またはエラーの通知が来るまで待機します。

例えば以下のようにCompletableを返すgetCompletableが定義してあったとします。

private fun getCompletable(success: Boolean): Completable =
    Completable.create { emitter ->
        if (success) {
            emitter.onComplete()
        } else {
            emitter.onError(Exception("Completable error occurred"))
        }
    }

getCompletableをsuspend関数にするためにawaitを使用することでCompletableの結果を待機して結果を返したりすることができます。onErrorが流れた際はawaitが例外をスローするのでtry〜catch文で捕捉できます。

suspend fun runCompletable(): Boolean {
    return withContext(Dispatchers.IO) {
        try {
            getCompletable(true).await()
            println("Completable completed successfully")
            true
        } catch (e: Exception) {
            e.printStackTrace()
            println("Error occurred: ${e.message}")
            false
        }
    }
}

呼び出す時は以下のような感じになります。

GlobalScope.launch {
    val result = runCompletable()
    println("result: $result")
}

awaitFirst:Observable → suspend

ObservableをCoroutinesに変更したい場合はawaitFirstなどいくつか用意されています。

awaitFirst

最初の要素を取得するまで待機。要素が発行されない場合はエラー

awaitLast

最後の要素を取得するまで待機。要素が発行されない場合はエラー

awaitSingle

要素が1つ流れた後に完了を取得するまで待機。要素が発行されないまたは複数の要素が流れる場合はエラー

awaitFirstOrNull

最初の要素を取得するまで待機。要素が発行されない場合はnullを返す

awaitFirstOrElse

最初の要素を取得するまで待機。要素が発行されないデフォルト値(ラムダ式を使用可能)を返す

awaitFirstOrDefault

最初の要素を取得するまで待機。要素が発行されない場合はデフォルト値を返す

// Rx系
private fun getObservable(): Observable<String> =
  Observable.create { emitter ->
      emitter.onNext("Hello")
      emitter.onNext("World")
      if (true) {
          emitter.onComplete()
      } else {
          emitter.onError(Exception("ERROR") )
      }
  }

// Rx系 → Coroutines
suspend fun runObservable(): String {
    return withContext(Dispatchers.IO) {
        try {
            val observable = getObservable()
            observable.awaitFirst()
        } catch (e: Exception) {
            e.printStackTrace()
            "Error occurred"
        }
    }
}

asFlow:Observable → Flow

ObservableをCoroutinesのFlowに変換したい場合はasFlowメソッドを使用します。

GlobalScope.launch {
    getObservable().asFlow().collect { value ->
        println(value) // "Hello" と "World" が順に出力される
    }
}

まだまだ勉強中ですので間違っている点や至らぬ点がありましたら教えていただけると助かります。

ご覧いただきありがとうございました。

searchbox

スポンサー

ProFile

ame

趣味:読書,プログラミング学習,サイト制作,ブログ

IT嫌いを克服するためにITパスを取得しようと勉強してからサイト制作が趣味に変わりました笑
今はCMSを使わずこのサイトを完全自作でサイト運営中〜

New Article

index