【Swift/Combine】receive(on:)/subscribe(on:)の使い方!スレッドを変更する

【Swift/Combine】receive(on:)/subscribe(on:)の使い方!スレッドを変更する

この記事からわかること

  • SwiftCombineフレームワーク
  • receive(on:)/subscribe(on:)メソッド使い方
  • Publisher実行スレッドを変更する方法

index

[open]

\ アプリをリリースしました /

みんなの誕生日

友達や家族の誕生日をメモ!通知も届く-みんなの誕生日-

posted withアプリーチ

公式リファレンス:Combine Framework

環境

パイプラインのスレッドを変更する

Combineでパイプラインのスレッドを変更するためのメソッドとしてreceive(on:)subscribe(on:)があります。両方ともPublisherから呼び出し引数にSchedulerを指定します。

Schedulerとは処理を実行するスレッドを管理するための仕組みです。ネットワークリクエストやCPU負荷の高い計算などバックグラウンドスレッドで実行したい処理やUI更新などのメインスレッドで実行したい処理を適切なスレッドで実行するために重要になってきます。

receive(on:)

公式リファレンス:receive(on:)

func receive<S>(
    on scheduler: S,
    options: S.SchedulerOptions? = nil
) -> Publishers.ReceiveOn<Self, S> where S : Scheduler

receive(on:)Publisherから流れてくる値やイベントがどのScheduler上で受け取られるかを制御します。receive(on:)オペレータが指定された位置から後の部分に影響を与えます

let publisher = Deferred {
    Future<String, Never> { promise in
        print("Future1 Thread: \(Thread.current)") // メインスレッド
        DispatchQueue.main.asyncAfter(deadline: .now() + 2) {
            print("Future2 Thread: \(Thread.current)") // メインスレッド
            promise(.success("成功したよ"))
        }
    }
}

publisher
    // 後続の処理をバックグラウンドスレッドで実行させる
    .receive(on: DispatchQueue.global(qos: .background))
    .flatMap { value in
        print("FlatMap Thread: \(Thread.current) + \(value)") // バックグラウンドスレッド
        return Just(value)
    }.sink { value in
        print("Sink Thread: \(Thread.current) + \(value)") // バックグラウンドスレッド
    }
    .store(in: &cancellables)

実行させてみるとPublisherはメインスレッドで、後続のflatMapやsinkなどはバックグラウンドスレッドで行われていることがわかります。このようにオペレーターを呼び出した以降の処理が全て指定したスレッドになります。

Future1 Thread: <_NSMainThread: 0x600001708000>{number = 1, name = main}
Future2 Thread: <_NSMainThread: 0x600001708000>{number = 1, name = main}
FlatMap Thread: <NSThread: 0x60000170d800>{number = 6, name = (null)} + 成功したよ
Sink Thread: <NSThread: 0x60000170d800>{number = 6, name = (null)} + 成功したよ

subscribe(on:)

公式リファレンス:subscribe(on:)

func subscribe<S>(
    on scheduler: S,
    options: S.SchedulerOptions? = nil
) -> Publishers.SubscribeOn<Self, S> where S : Scheduler

subscribe(on:)PublisherがどのScheduler上で実行されるかを制御します。subscribe(on:)オペレータが指定された位置から前の部分に影響を与えます

let publisher = Deferred {
    Future<String, Never> { promise in
        print("Future1 Thread: \(Thread.current)") // バックグラウンドスレッド
        DispatchQueue.main.asyncAfter(deadline: .now() + 2) {
            print("Future2 Thread: \(Thread.current)") // メインスレッド
            promise(.success("成功したよ"))
        }
    }
}

publisher
    .handleEvents(receiveSubscription: { value in
        print("購読開始 Thread: \(Thread.current) + \(value)") // バックグラウンドスレッド
    })
    .subscribe(on: DispatchQueue.global(qos: .background))
    .flatMap { value in
        print("FlatMap Thread: \(Thread.current) + \(value)") // メインスレッド
        return Just(value)
    }.sink { value in
        print("Sink Thread: \(Thread.current) + \(value)") // メインスレッド
    }
    .store(in: &cancellables)

実行させてみると明示的にスレッドを指定したところ以外はPublisherはバックグラウンドスレッドで、後続のflatMapやsinkなどはメインスレッドで行われていることがわかります。このようにオペレーターを呼び出した以前の処理が全て指定したスレッドになります。

Future1 Thread: <NSThread: 0x60000170ce80>{number = 7, name = (null)}
購読開始 Thread: <NSThread: 0x60000170ce80>{number = 7, name = (null)} + Future
Future2 Thread: <_NSMainThread: 0x60000170c000>{number = 1, name = main}
FlatMap Thread: <_NSMainThread: 0x60000170c000>{number = 1, name = main} + 成功したよ
Sink Thread: <_NSMainThread: 0x60000170c000>{number = 1, name = main} + 成功したよ

スレッドをこまめに切り替える

receive(on:)subscribe(on:)をこまめに使用することでスレッドを用途ごとに切り替えることができます。

publisher
    .handleEvents(receiveSubscription: { value in
        print("購読開始 Thread: \(Thread.current) + \(value)") // バックグラウンドスレッド
    })
    .subscribe(on: DispatchQueue.global(qos: .background))
    .handleEvents(receiveOutput: { value in
        print("Output1 Thread: \(Thread.current) + \(value)") // メインスレッド
    })
    .receive(on: DispatchQueue.main)
    .handleEvents(receiveOutput: { value in
        print("Output2 Thread: \(Thread.current) + \(value)") // メインスレッド
    })
    .receive(on: DispatchQueue.global(qos: .background))
    .flatMap { value in
        print("FlatMap Thread: \(Thread.current) + \(value)") // バックグラウンドスレッド
        return Just(value)
    }
    .receive(on: DispatchQueue.main)
    .sink { value in
        print("Sink Thread: \(Thread.current) + \(value)") // メインスレッド
    }
    .store(in: &cancellables)
Future1 Thread: <NSThread: 0x600001708400>{number = 6, name = (null)}
購読開始 Thread: <NSThread: 0x600001708400>{number = 6, name = (null)} + Future
Future2 Thread: <_NSMainThread: 0x600001708000>{number = 1, name = main}
Output1 Thread: <_NSMainThread: 0x600001708000>{number = 1, name = main} + 成功したよ
Output2 Thread: <_NSMainThread: 0x600001708000>{number = 1, name = main} + 成功したよ
FlatMap Thread: <NSThread: 0x60000170e440>{number = 2, name = (null)} + 成功したよ
Sink Thread: <_NSMainThread: 0x600001708000>{number = 1, name = main} + 成功したよ

RxSwiftのsubscribeOn/observeOn

RxSwiftにも同じような役割のメソッドが用意されています。

以下記事はRxJavaですがsubscribeOn/observeOnの使い方の参考にしてください。

まだまだ勉強中ですので間違っている点や至らぬ点がありましたら教えていただけると助かります。

ご覧いただきありがとうございました。

searchbox

スポンサー

ProFile

ame

趣味:読書,プログラミング学習,サイト制作,ブログ

IT嫌いを克服するためにITパスを取得しようと勉強してからサイト制作が趣味に変わりました笑
今はCMSを使わずこのサイトを完全自作でサイト運営中〜

New Article

index