【Flutter/Dart】Streamのmapやwhere、expandや並行/直列の連結方法!

この記事からわかること
- Flutter/DartでStreamで非同期処理を実装する方法
- mapやwhereの使い方
- 複数のStreamを直列/並列に連結させるには?
index
[open]
\ アプリをリリースしました /
環境
- Android Studio:Meerkat
- Xcode:16.0
- Flutter:3.29.2
- Dart:3.7.2
- Mac M1:Sequoia 15.4
Stream
クラスのmap
やwhere
、expand
メソッド、並行/直列の連結方法などをまとめていきます。
mapメソッド
map
は流れてきたデータイベントに任意の処理を加えて流し直すメソッドです。クロージャーの引数から流れてくる値を参照することができ、返り値に整形したデータを指定します。取得できるのはデータのみで完了やエラーイベントを取得することはできません。
Stream<int> stream = Stream.fromIterable([1, 2, 3]);
stream
.map((value) => value * 2)
.listen(
(data) => print('受信: $data'),
onError: (e) => print('エラー: $e'),
onDone: () => print('完了'),
);
直接関数を渡すことも可能です。
final toDouble = (value) => value * 2;
stream
.map(toDouble)
whereメソッド
where
は流れてきたデータイベントが条件にマッチしていれば流し直すメソッドです。クロージャーの引数から流れてくる値を参照することができ、返り値に条件結果の真偽値を指定します。
stream
.where((value) => value % 2 == 0)
.listen(
(data) => print('受信: $data'),
onError: (e) => print('エラー: $e'),
onDone: () => print('完了'),
);
Streamの連結(直列)
複数のStreamを直列に連結するためにはasync*
とyield*
を使用します。yield*
はStreamをそのまま流すことができるキーワードですが1つ以上でも動作するようになっています。
おすすめ記事:async* / yieldで自作する
Stream<int> delayedStream(int delay) async* {
for (int i = 1; i <= 3; i++) {
await Future.delayed(Duration(seconds: delay));
yield i;
}
}
/// 2つのStreamを直列に連結させたStream
Stream<int> combinedStream() async* {
yield* delayedStream(2).map((value) => value * 2);
yield* delayedStream(1);
}
void main() async {
combinedStream().listen(
(data) => print('受信: $data'),
onError: (e) => print('エラー: $e'),
onDone: () => print('完了'),
);
}
yield*
で連ねる場合は直列に連結されるようになります。つまり最初のdelayedStream(2)
が完了になればdelayedStream(1)
が呼ばれます。1つ目のStreamでエラーが発生すれば後続の処理も停止します。
Streamの連結(並列):StreamGroup.merge
複数のStreamを並列に連結するためにはStreamGroup
のmerge
メソッドを使用します。これを使用するためにはpackage:async/async.dart
を追加します。
import 'package:async/async.dart';
Stream<int> delayedStream(int delay) async* {
for (int i = 1; i <= 3; i++) {
await Future.delayed(Duration(seconds: delay));
yield i;
}
}
/// 2つのStreamを並列に連結させたStream
final merged = StreamGroup.merge([
delayedStream(2).map((value) => value * 5),
delayedStream(1)
]);
void main() async {
merged.listen(
(data) => print('受信: $data'),
onError: (e) => print('エラー: $e'),
onDone: () => print('完了'),
);
}
並列に実行している場合にエラーが発生すればそのStreamの処理は停止しますが、他のStreamには影響しません。そのためエラーはキャッチできますが、最終的に完了も呼ばれるような挙動になります。
import 'package:async/async.dart';
Stream<int> delayedStream(int delay) async* {
for (int i = 1; i <= 3; i++) {
await Future.delayed(Duration(seconds: delay));
if (i == 2) {
await Future.error("ERROR");
} else {
yield i;
}
}
}
final merged = StreamGroup.merge([
delayedStream(2).map((value) => value * 5),
delayedStream(1)
]);
void main() async {
merged.listen(
(data) => print('受信: $data'),
onError: (e) => print('エラー: $e'),
onDone: () => print('完了'),
);
}
// ログ
受信: 1
受信: 5
エラー: ERROR
エラー: ERROR
完了
まだまだ勉強中ですので間違っている点や至らぬ点がありましたら教えていただけると助かります。
ご覧いただきありがとうございました。