【Flutter/Dart】Streamクラスの使い方!非同期処理と購読(listen)

【Flutter/Dart】Streamクラスの使い方!非同期処理と購読(listen)

この記事からわかること

  • Flutter/DartStream非同期処理実装する方法
  • 購読(listen)の使い方

\ アプリをリリースしました /

みんなの誕生日

友達や家族の誕生日をメモ!通知も届く-みんなの誕生日-

posted withアプリーチ

環境

公式リファレンス:Asynchronous programming: Streams

Dartでの非同期処理

アプリ開発をしていく中で非同期処理の実装はマストになってきます。「非同期処理」とは処理の結果を待たずに並行して別の処理を進めるような処理のことです。例えばネットワーク通信やデータベースのCRUD操作、ファイルの読み書きなど処理に時間のかかる操作はアプリが一時的に停止してしまう原因になります。これらの処理を非同期で実行できるようにすることで、ユーザーはアプリをスムーズに操作し続けることができます。

反対の言葉で「同期処理」があります。これは処理が完了するまで次の処理には進まず、すべてを順番通りに実行する処理方法です。時間のかかる処理を同期的に行うと、アプリの応答が止まってしまいUXが低下してしまいます。

Dartにおける非同期プログラミングはFutureクラスとStreamクラス、async/awaitキーワードなどによって制御することが可能になっています。

Streamクラス

Stream非同期で実行される処理の流れの中で複数回流れるイベントを管理するためのオブジェクトです。そもそもストリームとは日本語で「流れ」を意味する英単語であり、プログラミングの分野ではデータの入出力の流れを保持する抽象的なオブジェクトのことを指します。DartのStreamでは時間の流れの中に「データの変更から完了またはエラーまで」を一連の順序(シーケンス)として保持しています。

その一連の流れはマーブルダイアグラムとして表現されます。左から右に行くにつれ時間の経過を表し、その過程の中でデータの変化などのイベントを検知、最終的には完了またはエラーでストリームは終了します。

RxSwiftとは?導入方法と使い方まとめ!ストリームを理解する

インスタンス化方法

使い方は関数の返り値をStream<任意のデータ型>で定義します。Stream型の定義方法は色々ありますが、まずはシンプルにIterable型からStream型を構築するfromIterableメソッドを使用する方法を例に見てみます。以下の場合は完了/エラーは流さず、データだけを順番にストリームに流していきます。

/// Iterable型からStreamを生成する
Stream<int> stream = Stream.fromIterable([1, 2, 3]);

1つの値だけ

final stream = Stream.value(100);

エラーだけ

final stream = Stream.error('エラー発生');

listenメソッド(購読)

Streamは生成するだけではイベントを取得できません。イベントを取得するためにはlistenメソッドを使用してストリームを購読する必要があります。定義を確認するとonDataイベントデータonErrorイベントエラーonDone完了イベントを取得できることがわかります。


StreamSubscription<T> listen(
  void onData(T event)?, {
  Function? onError,
  void onDone()?,
  bool? cancelOnError,
});

実際のコードでは以下のようになります。先ほどのStream場合はコレクションの要素がonDataとして流れ、コレクションが最後まで到達するとonDoneが呼ばれます。

Stream<int> stream = Stream.fromIterable([1, 2, 3]);

stream.listen(
  (data) => print('受信: $data'),
  onError: (e) => print('エラー: $e'),
  onDone: () => print('完了'),
);

Streamはこのlistenメソッドで購読を開始しないと動作が始まらないので注意してください。

複数の購読はできない

生成されたStreamを購読できるのは基本的に1つまでになっています。そのため1つ目のStreamが閉じる前に2つ目の購読を試みるとエラーを吐きます。「閉じる前に」というのがポイントで1つ目のStreamが2つ目の購読前に即座に完了 or Errorまで到達していれば問題なく購読することが可能です。

// Streamを自作する記述方(後述)
// 完了までに3秒以上かかるStream
Stream<int> delayedStream() async* {
  // インクリメントしながら3回値を流す
  for (int i = 1; i <= 3; i++) {
    await Future.delayed(Duration(seconds: 1));
    yield i;
  }
}

void main() {
  final stream = delayedStream();
  // 1つ目の購読
  stream.listen((value) => print('Listener 1: $value'));

  // 2つ目はエラー:Uncaught Error, error: Error: Bad state: Stream has already been listened to.
  stream.listen((value) => print('Listener 2: $value'));
}

asBroadcastStreamメソッド

複数の購読を行いたい場合asBroadcastStreamメソッドを使用します。これを使用すると「ブロードキャスト型のStream」に変換してくれます。購読するタイミングによっては既にイベントが流れ終えている可能性もありますが、過去のデータは取得できずにこれから流れるイベントのみを購読することができるようになります。

void main() {
  final stream = delayedStream().asBroadcastStream();
  // 1つ目の購読
  stream.listen((value) => print('Listener 1: $value'));
  // 2つ目の購読(エラーにならない)
  stream.listen((value) => print('Listener 2: $value'));
}

購読を停止する

購読を結果を待たずに途中で停止するにはlistenの返り値を保持しておきStreamSubscription型からcancelメソッドを呼び出します。

final subscription = stream.listen((value) => print('Listener 1: $value'));
// 購読を停止する
subscription.cancel();

await for

listenを使用せずにデータを参照するにはawait forを使用します。Streamから流れてくるデータをコレクションの様に取り出しながら処理することが可能です。

void main() async {
  await for (final value in delayedStream()) {
    print('受信: $value');
  }
}

エラーはそのままではキャッチできないのでtry ~ catch文を使用する必要があります。

try {
  await for (final value in delayedStream()) {
    print('受信: $value');
  }
} catch (e) {
  print('エラーが発生しました: $e');
}

カスタムStreamを自作する

カスタムでStreamを作成する方法はいくつか存在します。

async* / yield

Streamasync*/yieldを使用して自作することも可能です。async*非同期ストリームを返す関数であることを定義し、yield値をストリームに流します(emit)

/// Stream関数
Stream<int> delayedStream() async* {
  // インクリメントしながら3回値を流す
  for (int i = 1; i <= 3; i++) {
    await Future.delayed(Duration(seconds: 1));
    // 値をemit
    yield i;
  }
}

yield*で別のStreamをそのまま流すことも可能です。

Stream<int> numbers1to3() async* {
  // 他のストリームを流す
  yield* Stream.fromIterable([1, 2, 3]); 
}

throw ExceptionではなくFuture.error()を使用してエラーを発生させたい場合awaitキーワードを付与する必要があります。これを付与し忘れるとエラーがスローされないので注意してください。

/// Stream関数
Stream<int> delayedStream() async* {
  // インクリメントしながら3回値を流す
  for (int i = 1; i <= 3; i++) {
    await Future.delayed(Duration(seconds: 1));
    if (i == 3) {
      await Future.error("ERROR");
    } else {
      yield i;
    } 
  }
}

periodic

periodic一定の時間ごとにデータを発行できるタイマー付きのStreamを作成することも可能です。引数periodには発行したい間隔Durationで指定し、computationに実際にデータを流すための処理を記述します。


Stream<T> Stream.periodic(
  Duration period,
  [T computation(int count)?]
)

使用する際は無限にデータが流れてしまうためtakecancelを使用して処理をSTOPさせるようにする必要があるので注意してください。

final stream = Stream.periodic(Duration(seconds: 1), (count) => count);

await for (var value in stream.take(5)) {
  print('受信: $value');
}

StreamController

StreamController任意のタイミングでイベントを操作できる管理クラスです。addaddErrorcloseでデータやエラー、完了などを通知することが可能です。

import 'dart:async';

final controller = StreamController<bool>();

// イベントを流す
controller.add(false);

// 購読
controller.stream
  .listen((event) { 
    print('受信: $event');
  });

// 明示的に閉じる(完了通知)
controller.close();

購読したい場合はstreamプロパティからStream型を参照することができます。

sinkメソッドで責務を分離

StreamControllerでは直接addなどが可能ですがsinkを使用することで入力用のポートを分離させることができます。要は管理クラスなどを作成した際に読み取り用と書き込み用を分けて実装することが可能です。

class FlagService {
  final _controller = StreamController<bool>();
  
  /// 観測用のStream(読み取り用)
  Stream<bool> get stream => _controller.stream;
  /// イベント発火用のSink(書き込み用)
  Sink<bool> get sink => _controller.sink;

  void send(bool flag) {
    sink.add(flag);
  }

  void dispose() {
    sink.close();
  }
}

Sinkからの操作(addaddErrorcloseなど)はほとんど変わらずに使用することができます。

まだまだ勉強中ですので間違っている点や至らぬ点がありましたら教えていただけると助かります。

ご覧いただきありがとうございました。

searchbox

スポンサー

ProFile

ame

趣味:読書,プログラミング学習,サイト制作,ブログ

IT嫌いを克服するためにITパスを取得しようと勉強してからサイト制作が趣味に変わりました笑
今はCMSを使わずこのサイトを完全自作でサイト運営中〜

New Article