【Swift/Combine】receive(on:)/subscribe(on:)の使い方!スレッドを変更する

この記事からわかること
- SwiftのCombineフレームワーク
- receive(on:)/subscribe(on:)メソッドの使い方
- Publisherの実行スレッドを変更する方法
index
[open]
\ アプリをリリースしました /
環境
- Xcode:15.0.1
- iOS:17.1
- Swift:5.9
- macOS:Sonoma 14.1
パイプラインのスレッドを変更する
Combineでパイプラインのスレッドを変更するためのメソッドとしてreceive(on:)
とsubscribe(on:)
があります。両方ともPublisher
から呼び出し引数にSchedulerを指定します。
Schedulerとは処理を実行するスレッドを管理するための仕組みです。ネットワークリクエストやCPU負荷の高い計算などバックグラウンドスレッドで実行したい処理やUI更新などのメインスレッドで実行したい処理を適切なスレッドで実行するために重要になってきます。
receive(on:)
func receive<S>(
on scheduler: S,
options: S.SchedulerOptions? = nil
) -> Publishers.ReceiveOn<Self, S> where S : Scheduler
receive(on:)
はPublisherから流れてくる値やイベントがどのScheduler上で受け取られるかを制御します。receive(on:)
オペレータが指定された位置から後の部分に影響を与えます。
let publisher = Deferred {
Future<String, Never> { promise in
print("Future1 Thread: \(Thread.current)") // メインスレッド
DispatchQueue.main.asyncAfter(deadline: .now() + 2) {
print("Future2 Thread: \(Thread.current)") // メインスレッド
promise(.success("成功したよ"))
}
}
}
publisher
// 後続の処理をバックグラウンドスレッドで実行させる
.receive(on: DispatchQueue.global(qos: .background))
.flatMap { value in
print("FlatMap Thread: \(Thread.current) + \(value)") // バックグラウンドスレッド
return Just(value)
}.sink { value in
print("Sink Thread: \(Thread.current) + \(value)") // バックグラウンドスレッド
}
.store(in: &cancellables)
実行させてみるとPublisherはメインスレッドで、後続のflatMapやsinkなどはバックグラウンドスレッドで行われていることがわかります。このようにオペレーターを呼び出した以降の処理が全て指定したスレッドになります。
Future1 Thread: <_NSMainThread: 0x600001708000>{number = 1, name = main}
Future2 Thread: <_NSMainThread: 0x600001708000>{number = 1, name = main}
FlatMap Thread: <NSThread: 0x60000170d800>{number = 6, name = (null)} + 成功したよ
Sink Thread: <NSThread: 0x60000170d800>{number = 6, name = (null)} + 成功したよ
subscribe(on:)
func subscribe<S>(
on scheduler: S,
options: S.SchedulerOptions? = nil
) -> Publishers.SubscribeOn<Self, S> where S : Scheduler
subscribe(on:)
はPublisherがどのScheduler上で実行されるかを制御します。subscribe(on:)
オペレータが指定された位置から前の部分に影響を与えます。
let publisher = Deferred {
Future<String, Never> { promise in
print("Future1 Thread: \(Thread.current)") // バックグラウンドスレッド
DispatchQueue.main.asyncAfter(deadline: .now() + 2) {
print("Future2 Thread: \(Thread.current)") // メインスレッド
promise(.success("成功したよ"))
}
}
}
publisher
.handleEvents(receiveSubscription: { value in
print("購読開始 Thread: \(Thread.current) + \(value)") // バックグラウンドスレッド
})
.subscribe(on: DispatchQueue.global(qos: .background))
.flatMap { value in
print("FlatMap Thread: \(Thread.current) + \(value)") // メインスレッド
return Just(value)
}.sink { value in
print("Sink Thread: \(Thread.current) + \(value)") // メインスレッド
}
.store(in: &cancellables)
実行させてみると明示的にスレッドを指定したところ以外はPublisherはバックグラウンドスレッドで、後続のflatMapやsinkなどはメインスレッドで行われていることがわかります。このようにオペレーターを呼び出した以前の処理が全て指定したスレッドになります。
Future1 Thread: <NSThread: 0x60000170ce80>{number = 7, name = (null)}
購読開始 Thread: <NSThread: 0x60000170ce80>{number = 7, name = (null)} + Future
Future2 Thread: <_NSMainThread: 0x60000170c000>{number = 1, name = main}
FlatMap Thread: <_NSMainThread: 0x60000170c000>{number = 1, name = main} + 成功したよ
Sink Thread: <_NSMainThread: 0x60000170c000>{number = 1, name = main} + 成功したよ
スレッドをこまめに切り替える
receive(on:)
とsubscribe(on:)
をこまめに使用することでスレッドを用途ごとに切り替えることができます。
publisher
.handleEvents(receiveSubscription: { value in
print("購読開始 Thread: \(Thread.current) + \(value)") // バックグラウンドスレッド
})
.subscribe(on: DispatchQueue.global(qos: .background))
.handleEvents(receiveOutput: { value in
print("Output1 Thread: \(Thread.current) + \(value)") // メインスレッド
})
.receive(on: DispatchQueue.main)
.handleEvents(receiveOutput: { value in
print("Output2 Thread: \(Thread.current) + \(value)") // メインスレッド
})
.receive(on: DispatchQueue.global(qos: .background))
.flatMap { value in
print("FlatMap Thread: \(Thread.current) + \(value)") // バックグラウンドスレッド
return Just(value)
}
.receive(on: DispatchQueue.main)
.sink { value in
print("Sink Thread: \(Thread.current) + \(value)") // メインスレッド
}
.store(in: &cancellables)
Future1 Thread: <NSThread: 0x600001708400>{number = 6, name = (null)}
購読開始 Thread: <NSThread: 0x600001708400>{number = 6, name = (null)} + Future
Future2 Thread: <_NSMainThread: 0x600001708000>{number = 1, name = main}
Output1 Thread: <_NSMainThread: 0x600001708000>{number = 1, name = main} + 成功したよ
Output2 Thread: <_NSMainThread: 0x600001708000>{number = 1, name = main} + 成功したよ
FlatMap Thread: <NSThread: 0x60000170e440>{number = 2, name = (null)} + 成功したよ
Sink Thread: <_NSMainThread: 0x600001708000>{number = 1, name = main} + 成功したよ
RxSwiftのsubscribeOn/observeOn
RxSwiftにも同じような役割のメソッドが用意されています。
- RxSwift subscribeOn → Combine subscribe(on:)
- RxSwift observeOn → Combine receive(on:)
以下記事はRxJavaですがsubscribeOn/observeOnの使い方の参考にしてください。
まだまだ勉強中ですので間違っている点や至らぬ点がありましたら教えていただけると助かります。
ご覧いただきありがとうございました。