【Flutter/Dart】Streamクラスの使い方!非同期処理と購読(listen)

この記事からわかること
- Flutter/DartでStreamで非同期処理を実装する方法
- 購読(listen)の使い方
index
[open]
\ アプリをリリースしました /
環境
- Android Studio:Meerkat
- Xcode:16.0
- Flutter:3.29.2
- Dart:3.7.2
- Mac M1:Sequoia 15.4
公式リファレンス:Asynchronous programming: Streams
Dartでの非同期処理
アプリ開発をしていく中で非同期処理の実装はマストになってきます。「非同期処理」とは処理の結果を待たずに並行して別の処理を進めるような処理のことです。例えばネットワーク通信やデータベースのCRUD操作、ファイルの読み書きなど処理に時間のかかる操作はアプリが一時的に停止してしまう原因になります。これらの処理を非同期で実行できるようにすることで、ユーザーはアプリをスムーズに操作し続けることができます。
反対の言葉で「同期処理」があります。これは処理が完了するまで次の処理には進まず、すべてを順番通りに実行する処理方法です。時間のかかる処理を同期的に行うと、アプリの応答が止まってしまいUXが低下してしまいます。
Dartにおける非同期プログラミングはFuture
クラスとStream
クラス、async
/await
キーワードなどによって制御することが可能になっています。
- Future・・・1回だけの非同期結果
- Stream・・・複数の非同期イベント(値・エラー・完了)
Streamクラス
Stream
は非同期で実行される処理の流れの中で複数回流れるイベントを管理するためのオブジェクトです。そもそもストリームとは日本語で「流れ」を意味する英単語であり、プログラミングの分野ではデータの入出力の流れを保持する抽象的なオブジェクトのことを指します。DartのStreamでは時間の流れの中に「データの変更から完了またはエラーまで」を一連の順序(シーケンス)として保持しています。
その一連の流れはマーブルダイアグラムとして表現されます。左から右に行くにつれ時間の経過を表し、その過程の中でデータの変化などのイベントを検知、最終的には完了またはエラーでストリームは終了します。

インスタンス化方法
使い方は関数の返り値をStream<任意のデータ型>
で定義します。Stream
型の定義方法は色々ありますが、まずはシンプルにIterable
型からStream
型を構築するfromIterable
メソッドを使用する方法を例に見てみます。以下の場合は完了/エラーは流さず、データだけを順番にストリームに流していきます。
/// Iterable型からStreamを生成する
Stream<int> stream = Stream.fromIterable([1, 2, 3]);
1つの値だけ
final stream = Stream.value(100);
エラーだけ
final stream = Stream.error('エラー発生');
listenメソッド(購読)
Streamは生成するだけではイベントを取得できません。イベントを取得するためにはlisten
メソッドを使用してストリームを購読する必要があります。定義を確認するとonData
でイベントデータをonError
でイベントエラーをonDone
で完了イベントを取得できることがわかります。
StreamSubscription<T> listen(
void onData(T event)?, {
Function? onError,
void onDone()?,
bool? cancelOnError,
});
実際のコードでは以下のようになります。先ほどのStream場合はコレクションの要素がonData
として流れ、コレクションが最後まで到達するとonDone
が呼ばれます。
Stream<int> stream = Stream.fromIterable([1, 2, 3]);
stream.listen(
(data) => print('受信: $data'),
onError: (e) => print('エラー: $e'),
onDone: () => print('完了'),
);
Streamはこのlisten
メソッドで購読を開始しないと動作が始まらないので注意してください。
複数の購読はできない
生成されたStreamを購読できるのは基本的に1つまでになっています。そのため1つ目のStreamが閉じる前に2つ目の購読を試みるとエラーを吐きます。「閉じる前に」というのがポイントで1つ目のStreamが2つ目の購読前に即座に完了 or Errorまで到達していれば問題なく購読することが可能です。
// Streamを自作する記述方(後述)
// 完了までに3秒以上かかるStream
Stream<int> delayedStream() async* {
// インクリメントしながら3回値を流す
for (int i = 1; i <= 3; i++) {
await Future.delayed(Duration(seconds: 1));
yield i;
}
}
void main() {
final stream = delayedStream();
// 1つ目の購読
stream.listen((value) => print('Listener 1: $value'));
// 2つ目はエラー:Uncaught Error, error: Error: Bad state: Stream has already been listened to.
stream.listen((value) => print('Listener 2: $value'));
}
asBroadcastStreamメソッド
複数の購読を行いたい場合はasBroadcastStream
メソッドを使用します。これを使用すると「ブロードキャスト型のStream」に変換してくれます。購読するタイミングによっては既にイベントが流れ終えている可能性もありますが、過去のデータは取得できずにこれから流れるイベントのみを購読することができるようになります。
void main() {
final stream = delayedStream().asBroadcastStream();
// 1つ目の購読
stream.listen((value) => print('Listener 1: $value'));
// 2つ目の購読(エラーにならない)
stream.listen((value) => print('Listener 2: $value'));
}
購読を停止する
購読を結果を待たずに途中で停止するにはlisten
の返り値を保持しておきStreamSubscription
型からcancel
メソッドを呼び出します。
final subscription = stream.listen((value) => print('Listener 1: $value'));
// 購読を停止する
subscription.cancel();
await for
listen
を使用せずにデータを参照するにはawait for
を使用します。Streamから流れてくるデータをコレクションの様に取り出しながら処理することが可能です。
void main() async {
await for (final value in delayedStream()) {
print('受信: $value');
}
}
エラーはそのままではキャッチできないのでtry ~ catch
文を使用する必要があります。
try {
await for (final value in delayedStream()) {
print('受信: $value');
}
} catch (e) {
print('エラーが発生しました: $e');
}
カスタムStreamを自作する
カスタムでStreamを作成する方法はいくつか存在します。
- async* / yield
- periodic
- StreamController
async* / yield
Stream
はasync*
/yield
を使用して自作することも可能です。async*
で非同期ストリームを返す関数であることを定義し、yield
で値をストリームに流します(emit)。
/// Stream関数
Stream<int> delayedStream() async* {
// インクリメントしながら3回値を流す
for (int i = 1; i <= 3; i++) {
await Future.delayed(Duration(seconds: 1));
// 値をemit
yield i;
}
}
yield*
で別のStream
をそのまま流すことも可能です。
Stream<int> numbers1to3() async* {
// 他のストリームを流す
yield* Stream.fromIterable([1, 2, 3]);
}
throw Exception
ではなくFuture.error()
を使用してエラーを発生させたい場合はawait
キーワードを付与する必要があります。これを付与し忘れるとエラーがスローされないので注意してください。
/// Stream関数
Stream<int> delayedStream() async* {
// インクリメントしながら3回値を流す
for (int i = 1; i <= 3; i++) {
await Future.delayed(Duration(seconds: 1));
if (i == 3) {
await Future.error("ERROR");
} else {
yield i;
}
}
}
periodic
periodic
で一定の時間ごとにデータを発行できるタイマー付きのStreamを作成することも可能です。引数period
には発行したい間隔をDuration
で指定し、computation
に実際にデータを流すための処理を記述します。
Stream<T> Stream.periodic(
Duration period,
[T computation(int count)?]
)
使用する際は無限にデータが流れてしまうためtake
やcancel
を使用して処理をSTOPさせるようにする必要があるので注意してください。
final stream = Stream.periodic(Duration(seconds: 1), (count) => count);
await for (var value in stream.take(5)) {
print('受信: $value');
}
StreamController
StreamController
は任意のタイミングでイベントを操作できる管理クラスです。add
やaddError
、close
でデータやエラー、完了などを通知することが可能です。
import 'dart:async';
final controller = StreamController<bool>();
// イベントを流す
controller.add(false);
// 購読
controller.stream
.listen((event) {
print('受信: $event');
});
// 明示的に閉じる(完了通知)
controller.close();
購読したい場合はstream
プロパティからStream
型を参照することができます。
sinkメソッドで責務を分離
StreamController
では直接add
などが可能ですがsink
を使用することで入力用のポートを分離させることができます。要は管理クラスなどを作成した際に読み取り用と書き込み用を分けて実装することが可能です。
class FlagService {
final _controller = StreamController<bool>();
/// 観測用のStream(読み取り用)
Stream<bool> get stream => _controller.stream;
/// イベント発火用のSink(書き込み用)
Sink<bool> get sink => _controller.sink;
void send(bool flag) {
sink.add(flag);
}
void dispose() {
sink.close();
}
}
Sink
からの操作(add
やaddError
、close
など)はほとんど変わらずに使用することができます。
まだまだ勉強中ですので間違っている点や至らぬ点がありましたら教えていただけると助かります。
ご覧いただきありがとうございました。