【Flutter/Dart】Streamのmapやwhere、expandや並行/直列の連結方法!

【Flutter/Dart】Streamのmapやwhere、expandや並行/直列の連結方法!

この記事からわかること

  • Flutter/DartStream非同期処理実装する方法
  • mapwhere使い方
  • 複数のStreamを直列/並列連結させるには?

\ アプリをリリースしました /

みんなの誕生日

友達や家族の誕生日をメモ!通知も届く-みんなの誕生日-

posted withアプリーチ

環境

Streamクラスのmapwhereexpandメソッド、並行/直列の連結方法などをまとめていきます。

mapメソッド

map流れてきたデータイベントに任意の処理を加えて流し直すメソッドです。クロージャーの引数から流れてくる値を参照することができ、返り値に整形したデータを指定します。取得できるのはデータのみで完了やエラーイベントを取得することはできません。

Stream<int> stream = Stream.fromIterable([1, 2, 3]);
stream
  .map((value) => value * 2)
  .listen(
  (data) => print('受信: $data'),
  onError: (e) => print('エラー: $e'),
  onDone: () => print('完了'),
);

直接関数を渡すことも可能です。

final toDouble = (value) => value * 2;
stream
  .map(toDouble)

whereメソッド

where流れてきたデータイベントが条件にマッチしていれば流し直すメソッドです。クロージャーの引数から流れてくる値を参照することができ、返り値に条件結果の真偽値を指定します。

stream
  .where((value) => value % 2 == 0)
  .listen(
  (data) => print('受信: $data'),
  onError: (e) => print('エラー: $e'),
  onDone: () => print('完了'),
);

Streamの連結(直列)

複数のStreamを直列に連結するためにはasync*yield*を使用します。yield*はStreamをそのまま流すことができるキーワードですが1つ以上でも動作するようになっています。

おすすめ記事:async* / yieldで自作する

Stream<int> delayedStream(int delay) async* {
  for (int i = 1; i <= 3; i++) {
    await Future.delayed(Duration(seconds: delay));
    yield i;
  }
}

/// 2つのStreamを直列に連結させたStream
Stream<int> combinedStream() async* {
  yield* delayedStream(2).map((value) => value * 2);
  yield* delayedStream(1);
}

void main() async {
  combinedStream().listen(
    (data) => print('受信: $data'),
    onError: (e) => print('エラー: $e'),
    onDone: () => print('完了'),
  );
}

yield*で連ねる場合は直列に連結されるようになります。つまり最初のdelayedStream(2)が完了になればdelayedStream(1)が呼ばれます。1つ目のStreamでエラーが発生すれば後続の処理も停止します。

Streamの連結(並列):StreamGroup.merge

複数のStreamを並列に連結するためにはStreamGroupmergeメソッドを使用します。これを使用するためにはpackage:async/async.dartを追加します。

import 'package:async/async.dart';

Stream<int> delayedStream(int delay) async* {
  for (int i = 1; i <= 3; i++) {
    await Future.delayed(Duration(seconds: delay));
    yield i;
  }
}

/// 2つのStreamを並列に連結させたStream
final merged = StreamGroup.merge([
  delayedStream(2).map((value) => value * 5), 
  delayedStream(1)
]);


void main() async {
  merged.listen(
    (data) => print('受信: $data'),
    onError: (e) => print('エラー: $e'),
    onDone: () => print('完了'),
  );
}

並列に実行している場合にエラーが発生すればそのStreamの処理は停止しますが、他のStreamには影響しません。そのためエラーはキャッチできますが、最終的に完了も呼ばれるような挙動になります。

import 'package:async/async.dart';

Stream<int> delayedStream(int delay) async* {
  for (int i = 1; i <= 3; i++) {
    await Future.delayed(Duration(seconds: delay));
    if (i == 2) {
      await Future.error("ERROR");
    } else {
      yield i;
    }
    
  }
}

final merged = StreamGroup.merge([
  delayedStream(2).map((value) => value * 5), 
  delayedStream(1)
]);


void main() async {
  merged.listen(
    (data) => print('受信: $data'),
    onError: (e) => print('エラー: $e'),
    onDone: () => print('完了'),
  );
}

// ログ
受信: 1
受信: 5
エラー: ERROR
エラー: ERROR
完了

まだまだ勉強中ですので間違っている点や至らぬ点がありましたら教えていただけると助かります。

ご覧いただきありがとうございました。

searchbox

スポンサー

ProFile

ame

趣味:読書,プログラミング学習,サイト制作,ブログ

IT嫌いを克服するためにITパスを取得しようと勉強してからサイト制作が趣味に変わりました笑
今はCMSを使わずこのサイトを完全自作でサイト運営中〜

New Article