【Kotlin/Android】RxJavaの使い方と導入方法!Observableオブジェクト
この記事からわかること
- Android Studio/KotlinでRxJavaの使い方
- ReactiveXの特徴とメリット
- Observableオブジェクトの生成方法
- create/just/fromIterable/intervalメソッドの使い方
index
[open]
\ アプリをリリースしました /
友達や家族の誕生日をメモ!通知も届く-みんなの誕生日-
posted withアプリーチ
環境
- Android Studio:Flamingo
- Kotlin:1.8.20
RxJavaとは?
そもそもRxJavaとはMicrosoftが2011年にリリースしたReactiveX(Reactive Extensions)と呼ばれるライブラリのJava版です。ReactiveXはもともと.NET Framework用に開発されたライブラリで、その後JavaだけでなくC++やSwift、JavaScriptなど数多くのプラットフォームにも提供されるようになりました。
おすすめ記事:RxSwiftとは?導入方法と使い方まとめ!ストリームを理解する
RxJava自体もKotlinでそのまま活用することができますがRxKotlinも一緒に導入することでより扱いやすく実装することができるようになります。
ReactiveXの概要
ReactiveXの公式サイトによる説明は以下のようになっています。
”ReactiveXは、監視可能なシーケンスを使用して、非同期およびイベントベースのプログラムを作成するためのライブラリです。
オブザーバーパターンを拡張してデータやイベントのシーケンスをサポートし、低レベルのスレッド化、同期化、スレッド セーフ、並行データ構造、および非I/O をブロックします。”
要約すると非同期処理やイベント処理、時間経過に関数処理を1つのシーケンス(順序)として操作、観測できる機能を提供するAPIということでしょうか。
またReactiveXは「リアクティブプログラミング」と呼ばれるプログラミング手法を取り入れていると言われています。リアクティブプログラミングの良い例がExcelで「値を入力すると自動で計算結果が変更される」のがポイントになるようです。
RxJavaの導入方法
RxJavaをAndroid Studioで使用するために「bundle.gradle(Module)」に以下の文を追加して「Sync Now」をクリックします。2024年現在、RxJavaは3.x系までリリースされているようです。
dependencies {
// RxJava/RxJava/RxAndroid
implementation 'io.reactivex.rxjava3:rxjava:3.0.0'
implementation 'io.reactivex.rxjava3:rxandroid:3.0.0'
}
この際にRxAndroid
も一緒に導入しておきます。これはRxJavaを使用する上でUIスレッドへのスケジューリングやAndroidコンポーネントとの統合を行うためのユーティリティが含まれているようです。
RxJavaを使用する上でのポイント
RxJavaだけでなくReactiveX系を使用する上では重要になってくるポイントがいくつかあります。
- Stream:データの入出力の流れを保持する抽象的なオブジェクトのこと
- Observable:イベントを発火するオブジェクト(観測される側)
- Flowable:バックプレッシャー機能をサポートしたObservable
- Operators:Observableから流れてくるデータを整形する
- Subject:観測する側
Stream
ReactiveXでは基本的にストリームという概念の元に成り立っており、時間の流れの中に「データの変更から完了またはエラーまで」を一連の順序(シーケンス)として保持しています。
おすすめ記事:ストリーム(Stream)とシーケンス(Sequence)とは?
Observerパターン
RxJavaはデザインパターンの一種であるObserverパターンが採用されています。Observerパターンとは「状態変化などを観測し、通知する」を根本とした考え方のことです。
おすすめ記事:【GoF】Observerパターンとは?Publish-Subscribeパターンとの違い
Flowable
Flowable
はバックプレッシャー機能をサポートしたObservableオブジェクトの1種です。
Operators
RxJavaではさらにOperators
という概念があります。これは生成されているObservableのイベントの編集や絞り込みを行った新しいObservableを生成する役割を持っています。それ以外にも複数のObservableを結合することも可能です。
おすすめ記事:【Kotlin/Android】RxJavaのandThenメソッドの使い方!順番に処理する方法
今回はObservableに焦点を当ててまとめていきます。
Observable
Observableとは日本語で「観測可能」を意味する英単語です。ReactiveXではストリームを観測、検知可能なクラスのことをObservableと呼び、変化やエラー、完了を検知すると通知してくれます。onNext
はイベントの発生のたびに複数回検知される可能性があり、onError
またはonCompleted
はストリームの中でどちらか1回しか観測されません。そしてそのどちらかを検知したタイミングまたは明示的に指定したタイミングで監視は終了します。
種類 | 概要 |
---|---|
onNext | イベントを検知 |
onError | エラーの発生を検知 |
onCompleted | 完了を検知 |
Observableクラスの使い方
RxJavaではObservableクラスとして用意されています。例えば以下は「10秒ごとにイベントを発火し任意のカウントダウンするプログラム」です。
class MainActivity : AppCompatActivity() {
var disposable: Disposable? = null
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
setContentView(R.layout.activity_main)
disposable = Observable.interval(1, TimeUnit.SECONDS)
.take(10) // カウントダウンの回数
.map { count -> 10 - count } // カウントダウンの数値
.subscribe(
::onNext, // データ受信時の処理
Throwable::printStackTrace, // エラー時の処理
{ Log.i("RxJava-Test","Finish") } // 完了時の処理
)
}
override fun onDestroy() {
super.onDestroy()
// プログラムが終了する前にObservableの観測を解除
disposable?.dispose()
}
private fun onNext(count: Long) {
Log.i("RxJava-Test","Countdown: $count")
}
}
重要になってくるのが以下の部分です。
// Observableオブジェクトの生成
disposable = Observable.interval(1, TimeUnit.SECONDS)
// Operatorを使用してデータを整形
.take(10)
.map { count -> 10 - count }
// Observableオブジェクトを観測して変化やエラー、完了で処理を分岐
.subscribe(
::onNext,
Throwable::printStackTrace,
{ Log.i("RxJava-Test","Finish") }
)
ここで行なっているのは以下のとおりです。Observableオブジェクトを生成するだけではイベントは発火せず、subscribeすることでストリームが開始されます。
- Observableオブジェクトを生成
- Operatorを使用してデータを整形
- Observableオブジェクトを観測(subscribe)して変化やエラー、完了で処理を分岐
Disposableに格納する
Observableオブジェクトを生成して観測しストリームが開始されるとした場合はsubscribe
メソッドの返り値としてDisposable
型を受け取ります。Disposable
オブジェクトは観測を停止させる手段を提供します。
観測を停止させたいタイミングや観測が不要になるタイミングでdispose
メソッドを呼び出すことで明示的に観測を解除することができます。
Observableオブジェクトの生成方法
Observable
オブジェクトを生成する方法はいくつか存在します。
createメソッド
create
メソッドはシンプルなObservableオブジェクトを生成できるメソッドです。create<String>
のようにイベントで流したいデータ型を指定し、ラムダ式の中でemitterを使用して手動でイベントを発行することができます。onNext
メソッドで流したいデータ型をonComplete
メソッドで完了を通知することができます。
val observable = Observable.create<String> { emitter ->
emitter.onNext("Hello")
emitter.onNext("World")
var flag = true // 何らかの条件分岐
if (flag) {
emitter.onComplete()
} else {
emitter.onError(Exception("ERROR") )
}
}
justメソッド
just
メソッドは引数に渡したデータを発行するObservableオブジェクトを生成できるメソッドです。このメソッドは単純に値だけを流したい場合に使用する想定で、完了などを通知することはできません。
val observable = Observable.just("Hello", "World")
fromIterableメソッド
fromIterable
メソッドは引数に渡したIterableから値を発行するObservableオブジェクトを生成できるメソッドです。Iterable
、つまりコレクションや配列などの複数の要素を持つデータ構造のデータをObservableにすることができます。
val list = listOf("Hello", "World")
val observable = Observable.fromIterable(list)
intervalメソッド
interval
メソッドは指定した間隔で連続的に値を発行するObservableオブジェクトを生成できるメソッドです。つ目の引数で時間間隔を指定1し、12つ目の引数でその単位を指定1しています。
val observable = Observable.interval(1, TimeUnit.SECONDS)
Kotlin Coroutinesへの変換はこちら
RxSwiftはこちら
まだまだ勉強中ですので間違っている点や至らぬ点がありましたら教えていただけると助かります。
ご覧いただきありがとうございました。