【Kotlin/Android】RxJavaのFlowableの使い方!Observableとの違い

この記事からわかること
- Android Studio/KotlinでRxJavaの使い方
- Flowableオブジェクトとは?
- Observableとの違い
- バックプレッシャーとは?
- subscribeByでは動作しない?
index
[open]
\ アプリをリリースしました /
環境
- Android Studio:Flamingo
- Kotlin:1.8.20
Flowableとは?
RxJavaのFlowable
はバックプレッシャー機能をサポートしたObservableオブジェクトの1種です。Observable
クラスと同じようにデータストリームの中で各イベントを通知し、観測しているObserverが処理を実行します。
Observable
クラスとの違いはバックプレッシャー機能をサポートしているかいないかです。
バックプレッシャー機能とは?
「バックプレッシャー機能」とはデータフローを制御するためのメカニズムです。流れるデータが膨大な場合や処理速度が重い場合などデータの供給と処理速度が釣り合っていない場合にはメモリ消費が増加し、アプリケーションの性能の低下やクラッシュを引き起こす可能性が孕んできます。
これを防ぐための機能がバックプレッシャー機能であり、具体的にいうとデータの提供スピードをコントロールすることで適切にデータを扱えるようになります。
インスタンスの生成方法
Flowable
オブジェクトの生成方法は基本的にObservable
クラスと変わず、create
やjust
、fromIterable
、interval
などを使用します。
おすすめ記事:Observableオブジェクトの生成方法!create/just/fromIterable/intervalメソッドの使い方
バックプレッシャー機能の設定は2つ目の引数でBackpressureStrategy
型で指定します。
Flowable.create<String>(
{ emitter ->
emitter.onNext("Hello")
emitter.onNext("World")
emitter.onComplete()
}, BackpressureStrategy.BUFFER
).subscribe(
{ length ->
Log.e("RxJava", length.toString())
},
{ error ->
Log.e("RxJava", "Error: $error")
}
).addTo(compositeDisposable)
BackpressureStrategy
BackpressureStrategy
として以下が定義されています。
- MISSING:バックプレッシャー機能無効
- ERROR:オブザーバーの処理が間に合わない場合、エラーを発生
- BUFFER:オブザーバーが処理できるまで、供給しているデータをバッファリング
- DROP:オブザーバーの処理が間に合わない場合、供給しているデータをドロップ
- LATEST:最新に供給されたデータのみを保持し、オブザーバーが遅い場合は以前のデータを破棄
参考文献:Introduction to RxJava Flowable
subscribeByでは動作しない?
RxKotlinのsubscribeBy
を使用して購読しようとした場合、正常にFlowable
からのイベントを処理することができませんでした。subscribeBy
はバックプレッシャー機能をサポートしていないのかもしれません。
Flowable.create<String>(
{ emitter ->
emitter.onNext("Hello")
emitter.onNext("World")
emitter.onComplete()
}, BackpressureStrategy.BUFFER
).subscribeBy(
// UI更新処理など
{ length ->
Log.e("RxJava", length.toString())
}
).addTo(compositeDisposable)
まだまだ勉強中ですので間違っている点や至らぬ点がありましたら教えていただけると助かります。
ご覧いただきありがとうございました。