読者です 読者をやめる 読者になる 読者になる

RxSwift の スケジューラ読んでる

Observableみたいなものはとてもシンプルなつくりではあるのですが、それでもRxの亜種ではなく、元祖Rxを使うメリットはたいへん大きくて、主な理由はSchedulerやDisposableのような強力な部品があるからです。これらの部品たちは、他言語をまたいで同じ概念が移植され、その環境に合った形で生き生きと活動しているので、一度手になじめばどこへ行っても使えるところがさらに価値を高めているかんじです。

とくに、iosなどのネイティブアプリでは、並列に処理を実行することが許されているため、スケジューラの存在感はなかなかに大きいです。

もちろんiosはスレッドプール上で走るタスクキューの仕組み(GCDなど)を持っていますが、複数のタスクを組み合わせたり、エラー処理やキャンセルをしたいときには、さらなる構文的なサポートが欲しくなるところです。UIスレッドでイベントを受け、時間がかかる処理をバックグラウンドで実行、終わったら結果をUIスレッドに戻す、というやつが基本的なパターンですが、ここに時間の制御が絡んできたり、直列ではなく並列に実行したいものがあったり、それらの完了を待ち合わせたかったりすると、コールバックだけでは対応するのが難しくなってきます。さらに、エラー処理やキャンセルも絡んでくると、もはや綺麗に書くかパーフェクトに書くか2つに1つのトレードオフみたいな様相になってきてしまいがちです。

そこへいくと、Rxがあればスケジューラを持っているので、非同期処理の待ち合わせや組み合わせはすべてメソッドチェインのみで対応できてめちゃ楽かつ安全です。キューもタイマーも、subscribeを止めるだけですべて正確にキャンセルできるしエラー処理のサポートもあります。

ReactiveCocoa なんかも、新しい版からはかなりRxの影響が濃くなって、Schedulerが搭載されるようになってるみたいですね。

Serial と Concurrent

RxSwiftのドキュメントはセマンティクス的な話が充実していておもしろいです。 Schedulerについてはここに解説があります。 https://github.com/ReactiveX/RxSwift/blob/master/Documentation/Schedulers.md

まず、RxSwiftのスケジューラには、SerialなものとConcurrentなものがあると説明されています。 これは、同じスケジューラ上で同時に複数のObservableの値を流したときに違いがでます。

イメージ図

Serial

A  -x--x--x--->
B  --x--x--x-->
C  ---x--x--x->

Concurrent

A  -x--x--x->
B  -x--x--x->
C  -x--x--x->

RxSwift が並列なスケジューラをつくれる理由は、Swiftの実行環境が並列処理をサポートしているからです。

Rxのスケジューラーは、基本的には動作する環境に標準で用意されているスケジューリングの仕組みをRxで扱えるようにしてくれるものです。言語が違えばサポートされるスケジューラの種類も変わります。

たとえばiosでは、標準でキューをつくるときにオプションでSerialかConccurrentか指定できるようになってます。(他、常に利用できるグローバルキューのうち、メインキューはSerial、バックグラウンドがConcurrent)

// 直列に実行されるキュー(デフォルト)
let serialQueue = dispatch_queue_create("playground:serial", nil)

var a = 0

for i in (0...3) {
    dispatch_async(serialQueue) {
        print("task:\(i) \(NSThread.currentThread()) a:\(a)")
        a += 1
        print("task:\(i) \(NSThread.currentThread()) a:\(a)")
    }
}

// 直列に実行されることが保証される
// task:0 <NSThread: 0x7f8568c38140>{number = 3, name = (null)} a:0
// task:0 <NSThread: 0x7f8568c38140>{number = 3, name = (null)} a:1
// task:1 <NSThread: 0x7f8568c38140>{number = 3, name = (null)} a:1
// task:1 <NSThread: 0x7f8568c38140>{number = 3, name = (null)} a:2
// task:2 <NSThread: 0x7f8568c38140>{number = 3, name = (null)} a:2
// task:2 <NSThread: 0x7f8568c38140>{number = 3, name = (null)} a:3
// task:3 <NSThread: 0x7f8568c38140>{number = 3, name = (null)} a:3
// task:3 <NSThread: 0x7f8568c38140>{number = 3, name = (null)} a:4
// 並列に実行されるキュー
let concurrentQueue = dispatch_queue_create("playground:concurrent", DISPATCH_QUEUE_CONCURRENT)

for i in (0...3) {
    dispatch_async(concurrentQueue) {
        // ※スレッドセーフではない
        print("task:\(i) \(NSThread.currentThread()) a:\(a)")
        a += 1
        print("task:\(i) \(NSThread.currentThread()) a:\(a)")
    }
}

// 並列に実行される。排他制御をしていないので、スレッド競合が起きてる
task:0 <NSThread: 0x7fa39ad32a80>{number = 3, name = (null)} a:0
task:2 <NSThread: 0x7fa39ae11be0>{number = 5, name = (null)} a:0
task:1 <NSThread: 0x7fa39ae10f70>{number = 4, name = (null)} a:0
task:0 <NSThread: 0x7fa39ad32a80>{number = 3, name = (null)} a:1
task:1 <NSThread: 0x7fa39ae10f70>{number = 4, name = (null)} a:3
task:2 <NSThread: 0x7fa39ae11be0>{number = 5, name = (null)} a:3

Concurrentなキューに投げたほうのタスクは、複数のスレッドにふりわけられて実行されました。 上の例だと、値を1つずつインクリメントしたはずなのに、0からいきなり3になっているタスクがありますね。GCDで抽象化されているとはいえマルチスレッドで動いているため、共有変数に同時に書き込みしちゃうとスレッド競合が起こるようです。

Rx の スケジューラでおなじことしてみましょう。

let concurrentQueue = dispatch_queue_create("playground:concurrent", DISPATCH_QUEUE_CONCURRENT)
let concurrentScheduler = ConcurrentDispatchQueueScheduler(queue: concurrentQueue)

var a = 0
(0..<5).toObservable()
    .observeOn(concurrentScheduler)
    .doOnNext { i in
        print("i:\(i) \(NSThread.currentThread()) before a:\(a)")
        a += 1
        print("i:\(i) \(NSThread.currentThread()) after a:\(a)")
    }
    .subscribe { _ in }

// i:0 <NSThread: 0x7fbe60732700>{number = 3, name = (null)} before a:0
// i:0 <NSThread: 0x7fbe60732700>{number = 3, name = (null)} after a:1
// i:1 <NSThread: 0x7fbe6043aba0>{number = 4, name = (null)} before a:1
// i:1 <NSThread: 0x7fbe6043aba0>{number = 4, name = (null)} after a:2
// i:2 <NSThread: 0x7fbe6043a6d0>{number = 5, name = (null)} before a:2
// i:2 <NSThread: 0x7fbe6043a6d0>{number = 5, name = (null)} after a:3
// i:3 <NSThread: 0x7fbe6043aba0>{number = 4, name = (null)} before a:3
// i:3 <NSThread: 0x7fbe6043aba0>{number = 4, name = (null)} after a:4
// i:4 <NSThread: 0x7fbe6043aba0>{number = 4, name = (null)} before a:4
// i:4 <NSThread: 0x7fbe6043aba0>{number = 4, name = (null)} after a:5

複数のスレッド上で処理されましたが、直列に実行されています。ドキュメントによると、1つのObservableを流れる値は、流れた順番に処理されることが保証されると書かれています。 ( implicit-observable-guaarantees )

もちろん、Observableを複数同時に同じスケジューラ上で走らせることで、並列に実行されます。

var a = 0

(0..<3).toObservable()
    .observeOn(concurrentScheduler)
    .doOnNext { i in
        print("Observable A i:\(i) \(NSThread.currentThread()) before a:\(a)")
        a += 1
        print("Observable A i:\(i) \(NSThread.currentThread()) after a:\(a)")
    }
    .subscribe { _ in }

(3..<6).toObservable()
    .observeOn(concurrentScheduler)
    .doOnNext { i in
        print("Observable B i:\(i) \(NSThread.currentThread()) before a:\(a)")
        a += 1
        print("Observable B i:\(i) \(NSThread.currentThread()) after a:\(a)")
    }
    .subscribe { _ in }

// Observable A i:0 <NSThread: 0x7fbf4b504bc0>{number = 3, name = (null)} before a:0
// Observable A i:0 <NSThread: 0x7fbf4b504bc0>{number = 3, name = (null)} after a:1
// Observable B i:3 <NSThread: 0x7fbf4b504bc0>{number = 3, name = (null)} before a:1
// Observable A i:1 <NSThread: 0x7fbf4b45d4a0>{number = 4, name = (null)} before a:1
// Observable B i:3 <NSThread: 0x7fbf4b504bc0>{number = 3, name = (null)} after a:3
// Observable B i:4 <NSThread: 0x7fbf4b504bc0>{number = 3, name = (null)} before a:3
// Observable A i:1 <NSThread: 0x7fbf4b45d4a0>{number = 4, name = (null)} after a:4
// Observable B i:4 <NSThread: 0x7fbf4b504bc0>{number = 3, name = (null)} after a:4
// Observable A i:2 <NSThread: 0x7fbf4b45d4a0>{number = 4, name = (null)} before a:4
// Observable A i:2 <NSThread: 0x7fbf4b45d4a0>{number = 4, name = (null)} after a:5
// Observable B i:5 <NSThread: 0x7fbf4b504bc0>{number = 3, name = (null)} before a:5
// Observable B i:5 <NSThread: 0x7fbf4b504bc0>{number = 3, name = (null)} after a:6

ここでもやはり、共有変数の書き込みをすると競合するので、こういうのがどうしても必要なときは注意です。 ただし、自分で排他制御を書かなくても、必要なところだけSerialなスケジューラに切り替えるだけで良いと思います。

(0..<3).toObservable()
    .observeOn(concurrentScheduler)
    // 並列に実行したい処理
    .observeOn(serialScheduler)
    // 直列に実行したい処理
    .doOnNext { i in
        print("Observable A i:\(i) \(NSThread.currentThread()) before a:\(a)")
        a += 1
        print("Observable A i:\(i) \(NSThread.currentThread()) after a:\(a)")
    }
    .subscribe { _ in }

ImmediateSchedulerType と SchedulerType

というわけで、1つのストリームを流れる値は順番が保証されていることが確認できたので、これがどのように実現されているのかちょっと見ていこうと思います。

まず、RxSwift のスケジューラは全て ImmediateSchedulerType か SchdulerType プロトコルを継承しています。 2つのプロトコルインターフェイス上の違いは、SchedulerType の方だけ時間に関係するメソッドなどが追加されている点です。プロトコルエクステンションで暗黙に追加される メソッドも、SchedulerType のほうには時間制御つきのものなどが生えます。

SchedulerType は ImmediateScheduelrType を継承しているので、どちらのプロトコルを継承したスケジューラも、渡されたアクションを単純にスケジュールするschedule メソッドが使える点では同じです。

scheudle メソッドは、 StateType -> Disposable という型の、値を1つとるdispose可能なアクションを受けとって、一回実行されるようにスケジュールするメソッドです。 たとえば GCD ベースのスケジューラのschedule実装は、dispathc_async でキューに入れるだけになっていました。

    func scheduleInternal<StateType>(state: StateType, action: StateType -> Disposable) -> Disposable {
        let cancel = SingleAssignmentDisposable()
        
        dispatch_async(_queue) {
            if cancel.disposed {
                return
            }
            
            cancel.disposable = action(state)
        }
        
        return cancel
    }

ここはとくに排他制御は入っていませんでした。順番を保証するのはObservableごとなので、スケジューラ自体は順番を保証することはしていないみたいです。

他、ImmediateSchedulerType と SchedulerType はそれぞれ、プロトコルエクステンションによって scheduleRecursive というメソッドが暗黙的に利用できるようになっています。これは、再帰を利用して schedule された処理を繰り返し実行するための仕組みです。この機能は内部の排他制御の場面でも出てきます。

スピンロックとミューテックスベースのロック

RxSwift の中を見ると、排他制御にはおもに ミューテックスベースのNSRecursiveLockの他 、スピンロックや OSAtomicIncrement が使われています。

Cocoaで利用できるロックの種類は スレッドプログラミングガイド に載っていました。ロックやアトミック操作はOSレベルで提供されているものなので、他プログラミング言語にあるやつと同じものがだいたい使えるようです。

NSRecursiveLock は再帰ロックというもので、1つのスレッドが複数回取得可能なロックとのことです。考えてみると、1つのスレッドで同じ行を複数回通る状況は再帰しかありえないので、再帰が必要な場合のみ使うっていう意味らしい。

スピンロックは、ロックが取得できるまで無限ループ的なもので待機するという単純なつくりで、オーバーヘッドが最も少ないかわりに、待機時間が長くなるとCPUリソースをたくさん消費してしまうという特徴があるとあります。また、C#のドキュメントは、スピンロックは通常のロックより速くなることが確認できた場合のみ使うべし。と言っています。 RxSwift は、使える場所ではなかり積極的にスピンロックをつかっていました。後で見てみようと思いますが、その辺のテクニックはとてもおもしろいです。

OSAtomicIncrement は、整数などの読み書きをアトミックに呼び出せる機能で、RxSwift ではDisposeの単純なフラグ管理などで見られます。

observeOn の実装

ストリームを流れる値の順番の保証は、各オペレータごとに実装されているようなので、ここからは、observeOn の中身に注目して見てみます。

observeOn オペレーターは、スケジューラを受け取って、以降の値がそのスケジューラを走るようにきりかえるオペレータです。上で試したとおり、Concurrentなスケジューラを渡しても順番が保証されます。

observeOn の中を見ると、値が流れるときにロックをかける、ということは やっていません でした。 かわりに、scheduleRecursive を使って、タイミングが競合した場合に、とりあえず1つのスレッドだけに処理を許し、繰り返しscheduleを再帰的に呼ぶことで漏れなく処理するような仕組みになっており、ロックのオーバーヘッドを最小にするテクニックがつかわれていました(!)

observeOnに値が流れてきたときの処理は以下のようになっています。

   // RxSwift/Observables/Implementations/ObserveOn.swift:64

    override func onCore(event: Event<E>) {
        let shouldStart = _lock.calculateLocked { () -> Bool in
            self._queue.enqueue(event)
            
            switch self._state {
            case .Stopped:
                self._state = .Running
                return true
            case .Running:
                return false
            }
        }
        
        if shouldStart {
            _scheduleDisposable.disposable = self._scheduler.scheduleRecursive((), action: self.run)
        }
    }

ここでの _lock はスピンロックです。値が流れてきたら、スピンロックをかけてから、内部的にもっている単純なキューに値を入れています。キューに入れる操作は明らかに実行時間が短い上、他の処理がはさまる余地もないため、スピンロックが問題になることはなさそうですね。

そして、ロックされているスコープを抜ける前に、別スレッドがschedule処理を実行中かどうかを確認し、誰も実行していない (.Stoppedな) 場合のみ、Running状態にします。ここまででロック終わり。値をキューにいれつつ、同時に Runnning になるスレッドを1つに制御するためにロックを使っています。

ロックされたスコープを抜けると、.Running 状態を取得できていたスレッドだけが、scheduleRecursive で run に行きます 。タイミングが競合してしまったスレッドは何もしないで抜けています。その場合もキューには値を入れているので後々処理されるのかな(予想)。

run を見てみましょう。

   func run(state: Void, recurse: Void -> Void) {
        let (nextEvent, observer) = self._lock.calculateLocked { () -> (Event<E>?, O?) in
            if self._queue.count > 0 {
                return (self._queue.dequeue(), self._observer)
            }
            else {
                self._state = .Stopped
                return (nil, self._observer)
            }
        }
        
        if let nextEvent = nextEvent {
            observer?.on(nextEvent)
            if nextEvent.isStopEvent {
                dispose()
            }
        }
        else {
            return
        }
        
        let shouldContinue = _shouldContinue_synchronized()
        
        if shouldContinue {
            recurse()
        }
    }

ここは scheduleRecursive 経由で呼ばれるメソッドなので、 recurse という関数を受けとることができます。これを呼び出すことによって、スケジュールされたアクション(つまり run メソッド自体)を再帰的にもう一度スケジュールすることができる仕組みになっています。

最初にキューに入っている値を読んでいますが、ここもスピンロックをかけてから取り出していますね。

そして下へ行くと、 observer?.on() で実際にイベントを処理するコードが出てきます。が、おもしろいことに、この一番クリティカルセクションくさい重要な場所ではロックが取得されていません。

最期に、 _shouldContinue_syncronized() で、もう一度 run を実行する必要があるかチェックします。

    func _shouldContinue_synchronized() -> Bool {
        _lock.lock(); defer { _lock.unlock() } // {
            if self._queue.count > 0 {
                return true
            }
            else {
                self._state = .Stopped
                return false
            }
        // }
    }

予想どおり、キューが空になっているかチェックして、空でない場合に recurse() し、キューに余っている値の処理が実行されます。

まとめると、observeOn は、スレッド競合した場合に再帰的に何度もスケジュールをすることでタイミングを合わせる、といった使いかたがされてます。すげえ。

わかったこと

  • 1つのObservableを流れる値の順番は保証されている
  • しかし、よく使うobserveOnなどでは、 on(event: Event) の中をロックするような単純な実装ではなく、
  • 値をキューに一旦いれ、キューの出し入れ部分のみをクリティカルセクションにすることで実行時間が予想外に長くなることがなくなり、スピンロックが使えるという最適化が入っている
  • 値の流れるタイミングが競合してしまった場合は、ひとつのスレッドだけが処理を開始。キューがなくなるまで再帰で繰り返しスケジュールする

感想: かっこいい