しおメモ

雑多な技術系ブログです。

RxSwiftのSchedulerの実装を読んだ

少し前に、RxSwift5のSchedulerの実装を読んでみたので、初めての読んだ系記事です。

概要

RxSwiftでは、observeOnsubscribeOnで、流れてくるものに対する操作を行うスレッドやキューを指定できます。
それらの実行場所を指定するものがSchedulerクラス群になっていて、内部実装でDispatchQueueOperationQueueをラップしています。

observeOnとsubscribeOn

observeOnは、指定したschedulerで処理を実行するようなObservableを返すメソッドです。

Observable.just(Thread.isMainThread)
    .map { _ in print(Thread.isMainThread) }
    .observeOn(ConcurrentDispatchQueueScheduler(qos: .userInitiated))
    .subscribe(onNext: {
        print(Thread.isMainThread)
    })
    .disposed(by: disposeBag)

このようにすると、observeOn以降のprintDispatchQueue.globalで実行されるので、出力はtruefalseになります。

subscribeOnsubscribeとその解除を指定したスケジューラーで行うようなObservableを返すメソッドです。

Observable.just(Thread.isMainThread)
    .map { _ in print(Thread.isMainThread) }
    .subscribeOn(ConcurrentDispatchQueueScheduler(qos: .userInitiated))
    .subscribe(onNext: {
        print(Thread.isMainThread)
    })
    .disposed(by: disposeBag)

例えばこのように流すと、doもサブスレッドで行われるため、出力は両方ともfalseになります。

Serial/Concurrent Scheduler

ややこしいですが、Serial Scheduler(SerialDispatchQueueScheduler)は並列なDispatchQueueが渡された場合にも、実行順を保証するために、キューを直列に作り直します。

それに対して、Concurrent Schedulerは並列キューをそのまま実行してもに問題ない場合に用いられます。
こちらの場合、パフォーマンス的に有利になります。

主な組み込みのスケジューラー

MainScheduler

DispatchQueue.mainのラッパーになっているので、メインスレッドでasyncで実行されます。
しかし、これはSerialDispatchQueueSchedulerを実装しているので、直列スケジューラーです。

UIの更新などの、observeOn向けの実装になっています。

ConcurrentMainScheduler

こちらもDispatchQueue.mainのラッパーですが、こちらは並列スケジューラーで、subscribeOn用の実装になっています。

SerialDispatchQueueScheduler

DispatchQueue.globalのラッパーで直列スケジューラーです。

ConcurrentDispatchQueueScheduler

DispatchQueue.globalのラッパーで並列スケジューラーです。
グローバルキューなので、QoSを指定することが出来ます。

明示的にサブスレッドで動かしたい際は、通常こちらを利用することになります。