【Kotlin/Android】RxJavaのmergeやzipの使い方と違い!並列処理の実装
この記事からわかること
- Android Studio/KotlinでRxJavaの使い方
- 並列処理を実装する方法
- mergeメソッドの使い方
- zipメソッドの使い方
index
[open]
\ アプリをリリースしました /
友達や家族の誕生日をメモ!通知も届く-みんなの誕生日-
posted withアプリーチ
環境
- Android Studio:Flamingo
- Kotlin:1.8.20
RxJavaで並列処理を実装する方法
KotlinでRxJavaを使用して非同期処理を並列(同時実行)に実装するためのメソッドとしてmerge
やzip
などが用意されています。どれも複数のObservable
を扱う際に使用しますが、微妙に挙動や使い方が異なるので両者の違いと特徴をまとめていきます。
mergeメソッド
- 複数のObservableを結合して1つのObservableへ変換
- 結合して1つになったObservableから複数のObservableのイベントを取得可能
- イベントは複数のObservableからイベントが発火される度に流れる
- 処理は並列に行われ、処理の早いイベントから流れて取得できる
zipメソッド
- 複数のObservableに対応する位置の値をペアにして出力
- 処理は並列に行われるが値が流れるのはまとめている全ての値が揃った時
非同期処理を直列(順序実行)に実装したい場合はconcat
/concatWith
メソッドを使用します。使用方法は以下の記事を参考にしてください。
mergeメソッド
merge
メソッドは複数のObservableからのイベントを単一のObservableに順次結合するメソッドです。複数のObservableをmergeした場合も購読して流れてくる値は1つのみです。全てのObservableでイベントが発火するタイミングで同じ1つに結合されたObservableから値を観測することができるので全てのObservableの値が揃うのを待つわけではありません。
val observable1 = Observable.interval(1000, TimeUnit.MILLISECONDS)
.take(5)
.map { "Observable 1: $it" }
.subscribeOn(Schedulers.computation())
val observable2 = Observable.interval(5000, TimeUnit.MILLISECONDS)
.take(5)
.map { "Observable 2: $it" }
.subscribeOn(Schedulers.computation())
Observable
.merge(observable1, observable2)
.subscribe { value -> println(value) }
.addTo(compositeDisposable)
そのため結合したObservableから取得できるイベントの順番は全てのObservableの中で先に完了した順序通りになります。observable1の後にobservable2ではなく処理の早いほうが先に出力されます。
時間 出力
10:23:31.864 Observable 1: 0
10:23:32.865 Observable 1: 1
10:23:33.866 Observable 1: 2
10:23:34.866 Observable 1: 3
10:23:35.866 Observable 1: 4
10:23:35.870 Observable 2: 0
10:23:40.866 Observable 2: 1
10:23:45.866 Observable 2: 2
10:23:50.866 Observable 2: 3
10:23:55.866 Observable 2: 4
zipメソッド
zip
メソッドは複数のObservableに対応する位置の値をペアにして出力します。値が流れるのはzipでまとめている値が全て揃った時です。PublishSubject
を使用すると値が流れるタイミングがわかりやすいです。
val publisher1 = PublishSubject.create<Int>()
val publisher2 = PublishSubject.create<String>()
val publisher3 = PublishSubject.create<Boolean>()
Observable
.zip(
publisher1,
publisher2,
publisher3,
{ value1, value2, value3 -> Triple(value1, value2, value3) }
).subscribe { value -> println(value) }
.addTo(compositeDisposable)
publisher1.onNext(1)
publisher2.onNext("A")
publisher3.onNext(false) // 出力: (1, "A", false) ここで流れる
Observable
で実装するなら以下のような感じです。以下の場合一番遅い10秒ごとに値が出力されるようになります。
val observable1 = Observable.interval(1000, TimeUnit.MILLISECONDS)
.take(5)
.map { "Observable 1: $it" }
.subscribeOn(Schedulers.computation())
val observable2 = Observable.interval(5000, TimeUnit.MILLISECONDS)
.take(5)
.map { "Observable 2: $it" }
.subscribeOn(Schedulers.computation())
val observable3 = Observable.interval(10000, TimeUnit.MILLISECONDS)
.take(5)
.map { "Observable 3: $it" }
.subscribeOn(Schedulers.computation())
Observable
.zip(
observable1,
observable2,
observable3,
{ value1, value2, value3 -> Triple(value1, value2, value3) }
).subscribe { println("zip: $it") }
.addTo(compositeDisposable)
SwiftのCombineの場合
まだまだ勉強中ですので間違っている点や至らぬ点がありましたら教えていただけると助かります。
ご覧いただきありがとうございました。