【Kotlin/Android】RxJavaのmergeやzipの使い方と違い!並列処理の実装

【Kotlin/Android】RxJavaのmergeやzipの使い方と違い!並列処理の実装

この記事からわかること

  • Android Studio/KotlinRxJava使い方
  • 並列処理実装する方法
  • mergeメソッドの使い方
  • zipメソッドの使い方

index

[open]

\ アプリをリリースしました /

みんなの誕生日

友達や家族の誕生日をメモ!通知も届く-みんなの誕生日-

posted withアプリーチ

環境

RxJavaで並列処理を実装する方法

KotlinでRxJavaを使用して非同期処理を並列(同時実行)に実装するためのメソッドとしてmergezipなどが用意されています。どれも複数のObservableを扱う際に使用しますが、微妙に挙動や使い方が異なるので両者の違いと特徴をまとめていきます。

mergeメソッド

zipメソッド

非同期処理を直列(順序実行)に実装したい場合はconcat/concatWithメソッドを使用します。使用方法は以下の記事を参考にしてください。

mergeメソッド

mergeメソッドは複数のObservableからのイベントを単一のObservableに順次結合するメソッドです。複数のObservableをmergeした場合も購読して流れてくる値は1つのみです。全てのObservableでイベントが発火するタイミングで同じ1つに結合されたObservableから値を観測することができるので全てのObservableの値が揃うのを待つわけではありません。

val observable1 = Observable.interval(1000, TimeUnit.MILLISECONDS)
    .take(5)
    .map { "Observable 1: $it" }
    .subscribeOn(Schedulers.computation())

val observable2 = Observable.interval(5000, TimeUnit.MILLISECONDS)
    .take(5)
    .map { "Observable 2: $it" }
    .subscribeOn(Schedulers.computation())

Observable
    .merge(observable1, observable2)
    .subscribe { value -> println(value) }
    .addTo(compositeDisposable)

そのため結合したObservableから取得できるイベントの順番は全てのObservableの中で先に完了した順序通りになります。observable1の後にobservable2ではなく処理の早いほうが先に出力されます。

時間         出力
10:23:31.864    Observable 1: 0
10:23:32.865    Observable 1: 1
10:23:33.866    Observable 1: 2
10:23:34.866    Observable 1: 3
10:23:35.866    Observable 1: 4
10:23:35.870    Observable 2: 0
10:23:40.866    Observable 2: 1
10:23:45.866    Observable 2: 2
10:23:50.866    Observable 2: 3
10:23:55.866    Observable 2: 4

zipメソッド

zipメソッドは複数のObservableに対応する位置の値をペアにして出力します。値が流れるのはzipでまとめている値が全て揃った時です。PublishSubjectを使用すると値が流れるタイミングがわかりやすいです。

val publisher1 = PublishSubject.create<Int>()
val publisher2 = PublishSubject.create<String>()
val publisher3 = PublishSubject.create<Boolean>()

Observable
    .zip(
        publisher1,
        publisher2,
        publisher3,
        { value1, value2, value3 -> Triple(value1, value2, value3) }
    ).subscribe { value -> println(value) }
    .addTo(compositeDisposable)

publisher1.onNext(1)
publisher2.onNext("A")
publisher3.onNext(false)  // 出力: (1, "A", false) ここで流れる

Observableで実装するなら以下のような感じです。以下の場合一番遅い10秒ごとに値が出力されるようになります。

val observable1 = Observable.interval(1000, TimeUnit.MILLISECONDS)
    .take(5)
    .map { "Observable 1: $it" }
    .subscribeOn(Schedulers.computation())

val observable2 = Observable.interval(5000, TimeUnit.MILLISECONDS)
    .take(5)
    .map { "Observable 2: $it" }
    .subscribeOn(Schedulers.computation())

val observable3 = Observable.interval(10000, TimeUnit.MILLISECONDS)
    .take(5)
    .map { "Observable 3: $it" }
    .subscribeOn(Schedulers.computation())

Observable
    .zip(
        observable1,
        observable2,
        observable3,
        { value1, value2, value3 -> Triple(value1, value2, value3) }
    ).subscribe { println("zip: $it") }
    .addTo(compositeDisposable)

SwiftのCombineの場合

まだまだ勉強中ですので間違っている点や至らぬ点がありましたら教えていただけると助かります。

ご覧いただきありがとうございました。

searchbox

スポンサー

ProFile

ame

趣味:読書,プログラミング学習,サイト制作,ブログ

IT嫌いを克服するためにITパスを取得しようと勉強してからサイト制作が趣味に変わりました笑
今はCMSを使わずこのサイトを完全自作でサイト運営中〜

New Article

index