読者です 読者をやめる 読者になる 読者になる

RxSwift の スケジューラ読んでる

Observableみたいなものはとてもシンプルなつくりではあるのですが、それでもRxの亜種ではなく、元祖Rxを使うメリットはたいへん大きくて、主な理由はSchedulerやDisposableのような強力な部品があるからです。これらの部品たちは、他言語をまたいで同じ概念が移植され、その環境に合った形で生き生きと活動しているので、一度手になじめばどこへ行っても使えるところがさらに価値を高めているかんじです。

とくに、iosなどのネイティブアプリでは、並列に処理を実行することが許されているため、スケジューラの存在感はなかなかに大きいです。

もちろんiosはスレッドプール上で走るタスクキューの仕組み(GCDなど)を持っていますが、複数のタスクを組み合わせたり、エラー処理やキャンセルをしたいときには、さらなる構文的なサポートが欲しくなるところです。UIスレッドでイベントを受け、時間がかかる処理をバックグラウンドで実行、終わったら結果をUIスレッドに戻す、というやつが基本的なパターンですが、ここに時間の制御が絡んできたり、直列ではなく並列に実行したいものがあったり、それらの完了を待ち合わせたかったりすると、コールバックだけでは対応するのが難しくなってきます。さらに、エラー処理やキャンセルも絡んでくると、もはや綺麗に書くかパーフェクトに書くか2つに1つのトレードオフみたいな様相になってきてしまいがちです。

そこへいくと、Rxがあればスケジューラを持っているので、非同期処理の待ち合わせや組み合わせはすべてメソッドチェインのみで対応できてめちゃ楽かつ安全です。キューもタイマーも、subscribeを止めるだけですべて正確にキャンセルできるしエラー処理のサポートもあります。

ReactiveCocoa なんかも、新しい版からはかなりRxの影響が濃くなって、Schedulerが搭載されるようになってるみたいですね。

Serial と Concurrent

RxSwiftのドキュメントはセマンティクス的な話が充実していておもしろいです。 Schedulerについてはここに解説があります。 https://github.com/ReactiveX/RxSwift/blob/master/Documentation/Schedulers.md

まず、RxSwiftのスケジューラには、SerialなものとConcurrentなものがあると説明されています。 これは、同じスケジューラ上で同時に複数のObservableの値を流したときに違いがでます。

イメージ図

Serial

A  -x--x--x--->
B  --x--x--x-->
C  ---x--x--x->

Concurrent

A  -x--x--x->
B  -x--x--x->
C  -x--x--x->

RxSwift が並列なスケジューラをつくれる理由は、Swiftの実行環境が並列処理をサポートしているからです。

Rxのスケジューラーは、基本的には動作する環境に標準で用意されているスケジューリングの仕組みをRxで扱えるようにしてくれるものです。言語が違えばサポートされるスケジューラの種類も変わります。

たとえばiosでは、標準でキューをつくるときにオプションでSerialかConccurrentか指定できるようになってます。(他、常に利用できるグローバルキューのうち、メインキューはSerial、バックグラウンドがConcurrent)

// 直列に実行されるキュー(デフォルト)
let serialQueue = dispatch_queue_create("playground:serial", nil)

var a = 0

for i in (0...3) {
    dispatch_async(serialQueue) {
        print("task:\(i) \(NSThread.currentThread()) a:\(a)")
        a += 1
        print("task:\(i) \(NSThread.currentThread()) a:\(a)")
    }
}

// 直列に実行されることが保証される
// task:0 <NSThread: 0x7f8568c38140>{number = 3, name = (null)} a:0
// task:0 <NSThread: 0x7f8568c38140>{number = 3, name = (null)} a:1
// task:1 <NSThread: 0x7f8568c38140>{number = 3, name = (null)} a:1
// task:1 <NSThread: 0x7f8568c38140>{number = 3, name = (null)} a:2
// task:2 <NSThread: 0x7f8568c38140>{number = 3, name = (null)} a:2
// task:2 <NSThread: 0x7f8568c38140>{number = 3, name = (null)} a:3
// task:3 <NSThread: 0x7f8568c38140>{number = 3, name = (null)} a:3
// task:3 <NSThread: 0x7f8568c38140>{number = 3, name = (null)} a:4
// 並列に実行されるキュー
let concurrentQueue = dispatch_queue_create("playground:concurrent", DISPATCH_QUEUE_CONCURRENT)

for i in (0...3) {
    dispatch_async(concurrentQueue) {
        // ※スレッドセーフではない
        print("task:\(i) \(NSThread.currentThread()) a:\(a)")
        a += 1
        print("task:\(i) \(NSThread.currentThread()) a:\(a)")
    }
}

// 並列に実行される。排他制御をしていないので、スレッド競合が起きてる
task:0 <NSThread: 0x7fa39ad32a80>{number = 3, name = (null)} a:0
task:2 <NSThread: 0x7fa39ae11be0>{number = 5, name = (null)} a:0
task:1 <NSThread: 0x7fa39ae10f70>{number = 4, name = (null)} a:0
task:0 <NSThread: 0x7fa39ad32a80>{number = 3, name = (null)} a:1
task:1 <NSThread: 0x7fa39ae10f70>{number = 4, name = (null)} a:3
task:2 <NSThread: 0x7fa39ae11be0>{number = 5, name = (null)} a:3

Concurrentなキューに投げたほうのタスクは、複数のスレッドにふりわけられて実行されました。 上の例だと、値を1つずつインクリメントしたはずなのに、0からいきなり3になっているタスクがありますね。GCDで抽象化されているとはいえマルチスレッドで動いているため、共有変数に同時に書き込みしちゃうとスレッド競合が起こるようです。

Rx の スケジューラでおなじことしてみましょう。

let concurrentQueue = dispatch_queue_create("playground:concurrent", DISPATCH_QUEUE_CONCURRENT)
let concurrentScheduler = ConcurrentDispatchQueueScheduler(queue: concurrentQueue)

var a = 0
(0..<5).toObservable()
    .observeOn(concurrentScheduler)
    .doOnNext { i in
        print("i:\(i) \(NSThread.currentThread()) before a:\(a)")
        a += 1
        print("i:\(i) \(NSThread.currentThread()) after a:\(a)")
    }
    .subscribe { _ in }

// i:0 <NSThread: 0x7fbe60732700>{number = 3, name = (null)} before a:0
// i:0 <NSThread: 0x7fbe60732700>{number = 3, name = (null)} after a:1
// i:1 <NSThread: 0x7fbe6043aba0>{number = 4, name = (null)} before a:1
// i:1 <NSThread: 0x7fbe6043aba0>{number = 4, name = (null)} after a:2
// i:2 <NSThread: 0x7fbe6043a6d0>{number = 5, name = (null)} before a:2
// i:2 <NSThread: 0x7fbe6043a6d0>{number = 5, name = (null)} after a:3
// i:3 <NSThread: 0x7fbe6043aba0>{number = 4, name = (null)} before a:3
// i:3 <NSThread: 0x7fbe6043aba0>{number = 4, name = (null)} after a:4
// i:4 <NSThread: 0x7fbe6043aba0>{number = 4, name = (null)} before a:4
// i:4 <NSThread: 0x7fbe6043aba0>{number = 4, name = (null)} after a:5

複数のスレッド上で処理されましたが、直列に実行されています。ドキュメントによると、1つのObservableを流れる値は、流れた順番に処理されることが保証されると書かれています。 ( implicit-observable-guaarantees )

もちろん、Observableを複数同時に同じスケジューラ上で走らせることで、並列に実行されます。

var a = 0

(0..<3).toObservable()
    .observeOn(concurrentScheduler)
    .doOnNext { i in
        print("Observable A i:\(i) \(NSThread.currentThread()) before a:\(a)")
        a += 1
        print("Observable A i:\(i) \(NSThread.currentThread()) after a:\(a)")
    }
    .subscribe { _ in }

(3..<6).toObservable()
    .observeOn(concurrentScheduler)
    .doOnNext { i in
        print("Observable B i:\(i) \(NSThread.currentThread()) before a:\(a)")
        a += 1
        print("Observable B i:\(i) \(NSThread.currentThread()) after a:\(a)")
    }
    .subscribe { _ in }

// Observable A i:0 <NSThread: 0x7fbf4b504bc0>{number = 3, name = (null)} before a:0
// Observable A i:0 <NSThread: 0x7fbf4b504bc0>{number = 3, name = (null)} after a:1
// Observable B i:3 <NSThread: 0x7fbf4b504bc0>{number = 3, name = (null)} before a:1
// Observable A i:1 <NSThread: 0x7fbf4b45d4a0>{number = 4, name = (null)} before a:1
// Observable B i:3 <NSThread: 0x7fbf4b504bc0>{number = 3, name = (null)} after a:3
// Observable B i:4 <NSThread: 0x7fbf4b504bc0>{number = 3, name = (null)} before a:3
// Observable A i:1 <NSThread: 0x7fbf4b45d4a0>{number = 4, name = (null)} after a:4
// Observable B i:4 <NSThread: 0x7fbf4b504bc0>{number = 3, name = (null)} after a:4
// Observable A i:2 <NSThread: 0x7fbf4b45d4a0>{number = 4, name = (null)} before a:4
// Observable A i:2 <NSThread: 0x7fbf4b45d4a0>{number = 4, name = (null)} after a:5
// Observable B i:5 <NSThread: 0x7fbf4b504bc0>{number = 3, name = (null)} before a:5
// Observable B i:5 <NSThread: 0x7fbf4b504bc0>{number = 3, name = (null)} after a:6

ここでもやはり、共有変数の書き込みをすると競合するので、こういうのがどうしても必要なときは注意です。 ただし、自分で排他制御を書かなくても、必要なところだけSerialなスケジューラに切り替えるだけで良いと思います。

(0..<3).toObservable()
    .observeOn(concurrentScheduler)
    // 並列に実行したい処理
    .observeOn(serialScheduler)
    // 直列に実行したい処理
    .doOnNext { i in
        print("Observable A i:\(i) \(NSThread.currentThread()) before a:\(a)")
        a += 1
        print("Observable A i:\(i) \(NSThread.currentThread()) after a:\(a)")
    }
    .subscribe { _ in }

ImmediateSchedulerType と SchedulerType

というわけで、1つのストリームを流れる値は順番が保証されていることが確認できたので、これがどのように実現されているのかちょっと見ていこうと思います。

まず、RxSwift のスケジューラは全て ImmediateSchedulerType か SchdulerType プロトコルを継承しています。 2つのプロトコルインターフェイス上の違いは、SchedulerType の方だけ時間に関係するメソッドなどが追加されている点です。プロトコルエクステンションで暗黙に追加される メソッドも、SchedulerType のほうには時間制御つきのものなどが生えます。

SchedulerType は ImmediateScheduelrType を継承しているので、どちらのプロトコルを継承したスケジューラも、渡されたアクションを単純にスケジュールするschedule メソッドが使える点では同じです。

scheudle メソッドは、 StateType -> Disposable という型の、値を1つとるdispose可能なアクションを受けとって、一回実行されるようにスケジュールするメソッドです。 たとえば GCD ベースのスケジューラのschedule実装は、dispathc_async でキューに入れるだけになっていました。

    func scheduleInternal<StateType>(state: StateType, action: StateType -> Disposable) -> Disposable {
        let cancel = SingleAssignmentDisposable()
        
        dispatch_async(_queue) {
            if cancel.disposed {
                return
            }
            
            cancel.disposable = action(state)
        }
        
        return cancel
    }

ここはとくに排他制御は入っていませんでした。順番を保証するのはObservableごとなので、スケジューラ自体は順番を保証することはしていないみたいです。

他、ImmediateSchedulerType と SchedulerType はそれぞれ、プロトコルエクステンションによって scheduleRecursive というメソッドが暗黙的に利用できるようになっています。これは、再帰を利用して schedule された処理を繰り返し実行するための仕組みです。この機能は内部の排他制御の場面でも出てきます。

スピンロックとミューテックスベースのロック

RxSwift の中を見ると、排他制御にはおもに ミューテックスベースのNSRecursiveLockの他 、スピンロックや OSAtomicIncrement が使われています。

Cocoaで利用できるロックの種類は スレッドプログラミングガイド に載っていました。ロックやアトミック操作はOSレベルで提供されているものなので、他プログラミング言語にあるやつと同じものがだいたい使えるようです。

NSRecursiveLock は再帰ロックというもので、1つのスレッドが複数回取得可能なロックとのことです。考えてみると、1つのスレッドで同じ行を複数回通る状況は再帰しかありえないので、再帰が必要な場合のみ使うっていう意味らしい。

スピンロックは、ロックが取得できるまで無限ループ的なもので待機するという単純なつくりで、オーバーヘッドが最も少ないかわりに、待機時間が長くなるとCPUリソースをたくさん消費してしまうという特徴があるとあります。また、C#のドキュメントは、スピンロックは通常のロックより速くなることが確認できた場合のみ使うべし。と言っています。 RxSwift は、使える場所ではなかり積極的にスピンロックをつかっていました。後で見てみようと思いますが、その辺のテクニックはとてもおもしろいです。

OSAtomicIncrement は、整数などの読み書きをアトミックに呼び出せる機能で、RxSwift ではDisposeの単純なフラグ管理などで見られます。

observeOn の実装

ストリームを流れる値の順番の保証は、各オペレータごとに実装されているようなので、ここからは、observeOn の中身に注目して見てみます。

observeOn オペレーターは、スケジューラを受け取って、以降の値がそのスケジューラを走るようにきりかえるオペレータです。上で試したとおり、Concurrentなスケジューラを渡しても順番が保証されます。

observeOn の中を見ると、値が流れるときにロックをかける、ということは やっていません でした。 かわりに、scheduleRecursive を使って、タイミングが競合した場合に、とりあえず1つのスレッドだけに処理を許し、繰り返しscheduleを再帰的に呼ぶことで漏れなく処理するような仕組みになっており、ロックのオーバーヘッドを最小にするテクニックがつかわれていました(!)

observeOnに値が流れてきたときの処理は以下のようになっています。

   // RxSwift/Observables/Implementations/ObserveOn.swift:64

    override func onCore(event: Event<E>) {
        let shouldStart = _lock.calculateLocked { () -> Bool in
            self._queue.enqueue(event)
            
            switch self._state {
            case .Stopped:
                self._state = .Running
                return true
            case .Running:
                return false
            }
        }
        
        if shouldStart {
            _scheduleDisposable.disposable = self._scheduler.scheduleRecursive((), action: self.run)
        }
    }

ここでの _lock はスピンロックです。値が流れてきたら、スピンロックをかけてから、内部的にもっている単純なキューに値を入れています。キューに入れる操作は明らかに実行時間が短い上、他の処理がはさまる余地もないため、スピンロックが問題になることはなさそうですね。

そして、ロックされているスコープを抜ける前に、別スレッドがschedule処理を実行中かどうかを確認し、誰も実行していない (.Stoppedな) 場合のみ、Running状態にします。ここまででロック終わり。値をキューにいれつつ、同時に Runnning になるスレッドを1つに制御するためにロックを使っています。

ロックされたスコープを抜けると、.Running 状態を取得できていたスレッドだけが、scheduleRecursive で run に行きます 。タイミングが競合してしまったスレッドは何もしないで抜けています。その場合もキューには値を入れているので後々処理されるのかな(予想)。

run を見てみましょう。

   func run(state: Void, recurse: Void -> Void) {
        let (nextEvent, observer) = self._lock.calculateLocked { () -> (Event<E>?, O?) in
            if self._queue.count > 0 {
                return (self._queue.dequeue(), self._observer)
            }
            else {
                self._state = .Stopped
                return (nil, self._observer)
            }
        }
        
        if let nextEvent = nextEvent {
            observer?.on(nextEvent)
            if nextEvent.isStopEvent {
                dispose()
            }
        }
        else {
            return
        }
        
        let shouldContinue = _shouldContinue_synchronized()
        
        if shouldContinue {
            recurse()
        }
    }

ここは scheduleRecursive 経由で呼ばれるメソッドなので、 recurse という関数を受けとることができます。これを呼び出すことによって、スケジュールされたアクション(つまり run メソッド自体)を再帰的にもう一度スケジュールすることができる仕組みになっています。

最初にキューに入っている値を読んでいますが、ここもスピンロックをかけてから取り出していますね。

そして下へ行くと、 observer?.on() で実際にイベントを処理するコードが出てきます。が、おもしろいことに、この一番クリティカルセクションくさい重要な場所ではロックが取得されていません。

最期に、 _shouldContinue_syncronized() で、もう一度 run を実行する必要があるかチェックします。

    func _shouldContinue_synchronized() -> Bool {
        _lock.lock(); defer { _lock.unlock() } // {
            if self._queue.count > 0 {
                return true
            }
            else {
                self._state = .Stopped
                return false
            }
        // }
    }

予想どおり、キューが空になっているかチェックして、空でない場合に recurse() し、キューに余っている値の処理が実行されます。

まとめると、observeOn は、スレッド競合した場合に再帰的に何度もスケジュールをすることでタイミングを合わせる、といった使いかたがされてます。すげえ。

わかったこと

  • 1つのObservableを流れる値の順番は保証されている
  • しかし、よく使うobserveOnなどでは、 on(event: Event) の中をロックするような単純な実装ではなく、
  • 値をキューに一旦いれ、キューの出し入れ部分のみをクリティカルセクションにすることで実行時間が予想外に長くなることがなくなり、スピンロックが使えるという最適化が入っている
  • 値の流れるタイミングが競合してしまった場合は、ひとつのスレッドだけが処理を開始。キューがなくなるまで再帰で繰り返しスケジュールする

感想: かっこいい

RxSwift 読んでる

RxSwiftをちょっとずつ読んでる。最初はちょっと継承構造が激しい印象を受けたけど、基本的に関数型のワードに疎い自分でもとても読みやすいです。 Rxの不思議を理解していくと、アプリケーション内の別のパラダイムと仲良くする方法を考えるときのヒントになったり、まだ見ぬ応用が待っている予感がします。

ObservableとSink

Rxは、関数型言語の知識や並行処理の知識がなくても普通に使えるため、まずはその辺の事柄に目をつむり、 Observable がどう作用しているか理解するところから見ていきます。

さて、 Observable のオペレーターメソッドはなにをしているんでしょうか。 Observable に生えている map やら flatMap やらの各オペレーターメソッドは、新しいObservableを作って返しますが、このとき返るのはもちろん、オペレーターごとの挙動がカスタマイズされた新しい Observable です。RxSwiftでは、これは細かくサブクラスになっています。各オペレーターごとに Observable のサブクラスがあるイメージです。これらのObserable は、見た感じ、subscribe されるまでほぼ何もしません。オペレーターメソッドを呼び出した時点でやることは、新しい Observable をつくり、自身の1つ前の Observable の参照を持つことだけ。ちょうど、 Observableメソッドチェインを表現した、リンクリストを作っているイメージです。

Observable.just(1)      // Observableを返す
  .map { $0 + 1 }       // これもObservable。↑justの参照を持つ。
  .filter { $0 < 10 }   // これもObservable。↑mapの参照を持つ。
  .retry()              // これもObservable。↑filterの参照を持つ。
  .observeOn(scheduelr) // これも ↑
  .take(5)              // これも ↑

ちなみに、ひとつ前の Observable は 内部では一貫して source という名前で参照されます。たとえば上の例では、filtersourcemap です。

各オペレーターメソッドObservable を返しますが、Observable というのはつまるところ subscribe ができるオブジェクトのことです。ということは、メソッドチェインの途中で現れる数々の Observable たちも、内部では subscribe を持っているということになります。すると、それらメソッドチェイン途中の subscribe では何が行なわれるのか? 言い換えると、メソッドチェインの途中の ObservableObserver は何なのか。

その答えが Sink です。 RxSwift には Sink という名前の基底クラスが用意されており、オペレーターの実装はこれによってなされています。 Sink は、ストリームを流れてきた値を自身で処理した後、下の Observer に値を流すという振舞いをします。

class Sink<O : ObserverType> : SingleAssignmentDisposable {
  ...
}

オペレーターのメソッドチェインによって、Observable から Observable を参照するリストができていると上に書きましたが、 Sink によって、今度は Observer のチェインができます。Sink はそれ自身が Observer でもあり、同時に、1つ下のObserverの参照も保持しています。自身の処理を実行したら、今度は1つ下の Observer へ値を流します。( forwardOn )

// subscribeしたときのイメージ図。下から上へ、subscribeしながら遡る

source.source.source.subscribe(Sink(observer: Sink(observer: Sink(observer: observer))))
↑
source.source.subscribe(Sink(observer: Sink(observer: observer)))
↑
source.subscribe(Sink(observer: observer))
↑
subscribe(observer)
// 値が流れてきたときのイメージ図。上から下へ流れる

sink.on(event)
↓
sink.observer.on(event) 
↓
sink.observer.observer.on(event)
↓
sink.observer.observer.observer.on(event)

各オペレータが subscribe されたとき、イメージではだいたい上の図のようなことが起こっています。厳密にいうと例外が多々ありますが、順番に値が流れるように適切にSinkがセットアップされるということについては、だいたいこういうのを思い描けば良さそうです。

ところで、Sink のクラス宣言を見るとわかるように、SinkObserver として振舞うと同時に Disposable を継承している存在でもあります。SinkObserver としての実装を知っているので、合わせてdispose 処理も責任を持つということでしょう。この辺はなんとなく、SinkDisposable だと考えるととイメージがつかみづらいです。RxJSでは、 DisposableSubscription にリネームされてましたが、たしかに、Sinkのようなクラスを見ていると、disposeできるもの〜という表現よりも、subscribeという行為〜 という表現のほうがわかりやすい場合が多いかもしれないっすわ。

Generatorを持つSink

TailRecursiveSink は、Generator を保持しているSinkです。

class TailRecursiveSink<S: SequenceType, O: ObserverType where S.Generator.Element: ObservableConvertibleType, S.Generator.Element.E == O.E>
    : Sink<O>
    , InvocableWithValueType {
  ...
}

Generatorというのは、外部イテレーターとも言いますが、Swiftにおける、コレクションを外部から列挙するためのインターフェイスのことを指します。

// たとえば
let g = [1,2,3].generate() // コレクションをGeneratorに変換
g.next() // => 1
g.next() // => 2
g.next() // => 3
g.next() // => nil

TailRecursiveSink は、排他制御がかなり入っていますが、そこをシカトして見てみると、 Observable として振舞えそうなオブジェクトの Generator を持ち、値が流れてくると、 Generator から 取り出したObservable で処理させます。 そして、取り出したObservableが error あるいは completed したら、Generator から次のObservableを取り出して処理させます。以下繰り返し。

これで何が嬉しいかというと、わかりやすい例として、 retry の実装がこれによって行われています。 retry の実装は、自身のひとつ前のObservable (つまりsource) を無限に返し続ける Generator を生成し、後は それを TailRecursiveSink で処理するだけ。ただし onError は無視。というものになっています。これだけで、エラーが起きたらもう一度最初から subscribe する挙動が実現できるとは感動です。 ここが Generator になっていることで、たとえば同じく TailRecursiveSink によって実装されている concat も、retry もクラスがわずか数十行という驚愕の短さになっています。

つづく

Sinkという名前の基底クラスが用意されているのは僕が見た限りRxSwiftのだけっぽかったですが、原理的には他言語のRxもかなり似ているはずです。たいたいこの辺を抑えておけば、オペレーターの実装を読んでいく手がかりになるのではないでしょうか。

ひきつづき、Schedulerを調べてどのへんがクリティカルセクションになりえるのか理解したり、Rx units について考察したりしていきたいです。

RxJSでMVVMやってる

Rxは、すごくUIを書くのに向いているのではないだろうか。アプリケーションの状態を山盛りの変数で管理することから解放され、状態から状態へ変換する関数を書けばよくなるから。

非同期処理を同期っぽく書きたいならawait でいいじゃん。UIイベントを宣言的に書きたければ 2-wayバインディングがあれば良いじゃん。という話では終わらず、その辺の問題解決に加えて、値の発生器を全て同じ宣言にまとめられ、状態変数がなくなるところが書いていて楽しいところです。

// たとえば、、
Observable.fromEvent(searchBox, 'input') // 検索窓に字が打ちこまれたら
  .debounce(500)                         // 0.5秒ごとに
  .map(e => e.target.value)              // 入力されたテキストを
  .filter(q => q.length > 0)             // 1文字以上の場合だけ
  .select(q => fetch(`https://kensaku.com?q=${q}`)  // 検索リクエストをなげる
  .switch()                             // リクエストが複数なら最後だけ使う。残りはキャンセル
  .mergeMap(res => res.json())          // JSONをデシリアライズしといて
  .subscribe( ...)                      // よろしく

というようなのが、よく見るかっこいい例で、UIイベントとHTTPリクエストについての複雑な処理を、1本の連続したメソッドチェインで宣言できます。値を受けとって値を返す関数と、値を受けとって値生成器(Observable) を返す関数を書くというのがほぼやることの全てになり、見た目がかっこいい感じです。

さて、最近のjavascriptは、言語仕様がc#っぽくなり、フレームワークに頼らずともコードの分割がそこそこ統一できますし、 underscore や lodash がなくてもコレクションの操作がそこそこいける。ESはブラウザ実装状況を心配せずともbabelやbrowserifyを使えばたぶんOK。DOM操作は、サポートすべきブラウザの状況が変わりつつあるため、jQueryがなくとも素のAPIで意外とシンプルに書けます。あと足りないものってなんだっけ? 個人的には、Rxを使えば、標準のDOM APIだけでかなりうまくやれるのではないかと考えているところです。

MVVM

jsは、起動時にUIの参照をもっていないため、UIとそれ以外のコードの分離がとても難しいという独特な問題があり、アプリケーションが大きくなるとそのことを考えたくなります。その辺りをうまくやろうとするフレームワーク的なものが、これまでハイペースで生まれていました。その盛り上がりたるやすさまじく、ついに仮想DOMというすげえアイデアが登場するに至り、その都会的な新しいセンスに人気が集まっています。コードからDOMを支配するのではなく、コードから生成し完全に支配できるDOMもどきをつくっちゃえばいいじゃん。後で実DOMと同期させればいいじゃん。と彼らは言います。たしかに、これがあれば、UIのコードもそれ以外のコードもはじめからすべてjavascript管理下なので、分離がかんたんだ。

仮想DOMはたしかにすごいですが、よく見ると新しい問題も生まれています。アプリケーションのコードをうまく分割するためには、当然仮想DOMのコンポーネントをネストさせることになりますが、そんなことしてしまうと、子の仮想DOMの変更を、rootの仮想DOMにまで通知しなければいけなくなります。子の仮想DOMが勝手に自身の管理下のDOMを変更したとしても、親の仮想DOMもそのことを知らなければ、アプリケーション全体の差分レンダリングができない。Fluxは、データの流れが一方向できれいだと主張していていましたが、それはある程度マーケティング的な側面があり、仮想DOMを使う以上は、内部の細かな変更をもらさず全体に通知するアーキテクチャに行くのは当たり前なはずです。

仮想DOMのネストどうするか問題。もしも、Rxをつかえばどうなるでしょうか。UI部品の変更タイミングを、Observableを使って通知することを考えてみます。すべてのUIコンポーネントが、仮想DOMを返す変わりに、「仮想DOMを生成するObservable」を返すようにすれば、たくさんのコンポーネントの変更を束ねることができます。アプリケーション内のネストしたコンポーネントすべての変更を監視し、最新の仮想DOMを受け取ってレンダリングする。この流れをRxで一本にまとめることができそうです。

ちょうどそのような仕組みをもった、RxJSと仮想DOMをつかったライブラリに cycle.js というものがあります。 cycle.jsは、Rxでなんでも処理できるから、アプリケーション全体がただの Rxの通り道でいいじゃん。という考え方におもいっきりハンドルを切っています。 cycle.js では、アプリケーション全体が 1つの関数です。この関数は、入力に Observableを受けとって、出力に新しいObservableを返します。

f:id:hadashia:20160207023811p:plain

図のように、cycle.jsのコンセプトは、アプリケーション全体がSinkになっていることがすごく特徴的です。そのことがすごく綺麗に見えて、目に映る全てのものが輝いてみえてしまう僕だったのですが、これを使ってアプリケーションを書いてみたところ、DOM、HTTP、他、全てのものがアプリケーション全体の入出力になっている点に意外と違和感を感じました。内部で発生する通信などのObservable生成は、すべてアプリケーションの外に投げてあげないといけないし、受けとるときも、誰が発行したかに関わらずアプリケーション全体への入力として渡ってきます。この仕様には、アプリケーションが内側に隠蔽したObservableがあっても別に良いのではないか? という疑問が出てきます。すべてのObservableを1本にまとめなくとも、こまぎれにして部分部分で生成/subscribeしたとしてもRxは充分強力に思えるからです。

f:id:hadashia:20160207023831p:plain

上の図は、Rx でMVVMをするための ReactiveProperty というライブラリの図です。MVVMでは、UI と、それを表現した値オブジェクトであるViewModelをつくり、ViewModelのプロパティを変更すると自動でUIがレンダリングされるような設計です。ReactivePropertyでは、このViewModelのプロパティを、ただの値ではなく、プリミティブな値をラップした ReactivePropetyとして持たせておきます。 UI側の責任は、ViewModelの値をUIに反映させること、UIイベントを ViewModel にわかる形で通知することだけです。そこにはほとんどRxの姿はありません。 ViewModelに変更通知を送った結果、それがどのように加工され、フィルタされ、キャンセルされ、別のObservableとまぜられようが、UIはなにもしりません。(UI部分を完全に切り離せるのは、XAML という仕組みがあるから当然かもしれませんが) ViewModelの仕事は、UIイベントを受けて、自身のReactivePropetyを書き換えることで完結します。なぜそれだけでアプリケーションが書けるかというと、UIイベントも、ReactivePropertyも、Observableとして扱えるようになっているからです。

cycle.jsとの最大のちがいは、アプリケーションへの入出力がUIがらみのもにになり、それ以外の仕事はたとえRxで処理されようとそうでなかろうとViewModel以下に分離されていることです。それぞれの責務にゆらぎがなく、ViewModelのテストが書きやすいことが利点です。

Rxで素朴にMVVM

てかんじでシンプルにMVVMすればいいんじゃないかなー。というのが今の僕の考えです。 javascriptには、XAMLのような、UIとReactivePropetyを自動で結びつける機能はありませんが、Observableなインターフェイスをもった値オブジェクトをつくることは簡単です。

import { BehaviorSubject } from 'rxjs/Rx'

class Variable {
  constructor(value) {
    this.subject = new BehaviorSubject(value)
  }

  get value() {
    return this.subject.value
  }

  set value(newValue) {
    this.subject.next(newValue)
  }

  get isUnsubscribed() {
    return this.subject.isUnsubscribed
  }

  get observable() {
    return this.subject
  }

  next(value) {
    this.subject.next(value)
  }

  error(error) {
    this.subject.error(error)
  }

  complete() {
    this.subject.complete()
  }
}

export default Variable

この Variable でラップした値を、ViewModel にもたせます。

class ViewModel {
  constructor(attrs) {
    this.keys = Object.keys(attrs)
    for (let key of this.keys) {
      this[key] = new Variable(attrs[key])
    }
  }
}

let vm = new ViewModel({ name: 'a' })

vm.name.subscribe(console.log)
//=> 'a' がコンソールにでる

vm.name.value = 'b'
//=> 'b' がコンソールにでる

vm.name.value = 'c'
//=> 'c' がコンソールにでる

Observable.of('hoge', 'fuga', 'fugo')
  .map(v => `Mr. ${v}`)
  .subscribe(vm.name)

//=> 'Mr. hoge' 'Mr. fuga' 'Mr. fugo' がコンソールにでる

Observable な値をもつ ViewModel ができました。 ためしにこれをつかって、TodoMVC的なやつを書いてみました。

github.com

javascriptには、XAMLのような、UIのプロパティをカスタムクラスにマッピングする機能がないので、まずはViewをレンダリングしたり、UIイベントをObservableに変換するものをつくります。

class TodoList extends Component {
  constructor({el}) {
    super({el})
    this.vm = new TodoListViewModel({ todos: [] })
    
    this.on('keypress', '.new-todo')
      .filter(e => e.which === 13)
      .map(e => e.target.value)
      .subscribe(this.vm.create())

    this.on('keypress', '.edit')
      .filter(e => e.which === 13)
      .map(e => {
        const i = e.target.closest('li').dataset.index
        const title = e.target.value
        return {i, title}
      })
      .subscribe(this.vm.update())

    this.on('dblclick', '.todo')
      .map(e => e.target.closest('li').dataset.index)
      .subscribe(this.vm.editing(true))

    this.on('blur', '.edit')
      .subscribe(this.vm.editing(false))

    this.on('keydown', '.edit')
      .filter(e => e.which === 27)
      .subscribe(this.vm.editing(false))

    this.on('change', '.toggle')
      .map(e => e.target.closest('li').dataset.index)
      .subscribe(this.vm.toggle())

    this.on('click', '.destroy')
      .map(e => e.target.closest('li').dataset.index)
      .subscribe(this.vm.destroy())
    
    this.bindDOM()
  }

  render() {
    return this.vm.todos.observable
      .map(todos => {
        console.log(todos)
        const remining  = todos.filter(todo => !todo.completed).length
        const completed = todos.length > 0 && remining === 0

        return h('div', [
          h('section.todoapp', [
            h('header.header', [
              h('h1', 'todos'),
              h('input.new-todo', { placeholder: 'What needs to be done?', autofocus: true })
            ]),

            h('sestion.main', [
              h('input#toggle-all.toggle-all', { type: 'checkbox', checked: false }),
              h('label', { for: 'toggle-all' }),
              h('ul.todo-list', todos.map((todo, i) => h('li', {
                className: (todo.editing ? 'editing' : ''),
                attributes: { 'data-index': i }
              }, [
                todo.editing ? h('input.edit', {value: todo.title }) :
                h('div.view', [
                  h('input.toggle', { type: 'checkbox', checked: todo.completed }),
                  h('label.todo', todo.title),
                  h('button.destroy')
                ])
              ])))
            ]),

            remining > 0 ?
              h('footer.footer', [
                h('span.todo-count', [
                  h('string', `${remining}`),
                  remining <= 1 ? ' Item' : ' Items'
                ]),
                h('ul.filters', [
                  h('li', h('a.selected', { href: '#' }, 'All')),
                  h('li', h('a', { href: '#' }, 'Active')),
                  h('li', h('a', { href: '#'}, 'Completed')),
                  (completed ? h('button.clear-completed', 'Clear completed') : null)
                ])
              ]) :
            null
          ]),
        
          h('footer.info', [
            h('p', 'Double-click to edit a todo'),
            h('p', 'Writen by')
          ])
        ])
      })
  }
}

Reactのような機能があまり必要でないので、仮想DOMには https://github.com/Matt-Esch/virtual-dom をつかってみました。 このクラスは、仮想DOMの構築と、UIイベントをViewModel に通知することが仕事です。 ちなみに、Rxもvirtual-domも独立したコンポーネントなので、仮想DOMをつかわずに、サーバでレンダリングした実DOMをこのクラスでいじるケースでも同じ仕組みが使えるでしょう。

render() は、仮想DOMオブジェクトを返すObservableを返します。これは基底クラスか、初期化する人がsubscribeして実DOMにぶちこむ想定です。 ViewModel の todos プロパティは、上で定義した Variable になっているため、変更を監視してレンダリングするまでをRxで処理できるようになっています。

気にいっているポイントは、

this.event('new-todo', 'keypress')
  .filter(e => e.which === 13)
  .map(e => e.target.value)
  .subscribe(this.vm.create())

という部分で、この例では、特定の要素の keypresイベントを、Enterキーだけフィルタし、テキスト入力を ViewModelに通知しています。 ViewModel の create() メソッドは、Observer を返します。ここでは、受けとった テキスト入力イベントを、なにか独自のやりかたで処理し、新しいTODOを追加して自身のVariableを更新するObserverですね。

class TodoListViewModel extends ViewModel {
  create() {
    return title => {
      if (title.length > 0) {
        let todos = this.todos.value
        todos.push({ title: title })
        this.todos.value = todos
      }
    }
  }
}

ここをObserverにしてしまうと、UIからのObservableの流れが寸断されちゃうのですが、UIとViewModelはできる限り分離したいのでむしろそのほうが良いんじゃないかなというかんじです。もし、HTTP通信が必要なら、

class TodoListViewModel extends ViewModel {
  create() {
    return title => {
      Observable.fromPromie(fetch(...))
        .mergeMap(req => req.json())
        .subscribe(todo => {
          let todos = this.todos.value
          todos.push({ title: title })
          this.todos.value = todos
        })
      }
    }
  }
}

のように、ViewModelがObservbaleを生成するなり、他のモデルに生成させるなりすればいいんじゃねえかなと今のところ考えています。ViewModel には、UIに関わることが混じらないのでテストも書きやすいとおもいます。

とここまでやって思ったのは、subscribeしたものを適切にdispose(unsubscribe)する仕組みが必要だったりするので、その辺りもアレしていきたいです。

暇なとき意味なく無害なコマンド打ってる

brew update と打つと、ほどよく時間がかかり、ほどよく字がいっぱい出るし、ほどよくアップデートした気持ちになるから、やる気がしないときによく打ってる。

cycle.js やってる

javascript がブラウザ上ではしりだしたとき、自分のコードは画面になにが表示されているのか何も知らなくて、document.getElement* だとか querySelector$ とかをしてまず検索してとってきてはじめてわかる。javascriptで画面をいじるときはまず検索。これは、画面内のすべてをjavascriptでハンドリングしたいとき 、とても奇妙に感じる。ネイティブアプリケーションならば、実行時に即、Viewのオブジェクトの参照にさわれる状態でスタートするわけだから、そうなってないのが若干おかしいという感覚。

DOMオブジェクトと自分のコードの距離はおもっているよりも離れていて、jqueryからangular的なものからreact的なものまで、その距離を埋めることに注目していたような気がする。Viewを完全に自分の支配下の置き言うことを聞かせる。angular前後でその目的は達成され、Viewは完全に言うことを聞くようになったのかもしれないけど、今度はコード全体のAPIがViewを中心にまわっていて、UIイベントは綺麗に書けるけどそれ以外を分離するためにコードベースがでかめだったりしていた。React も、Viewに値をどう結びつけるのか考える過程でアーキテクチャがでかめになっているようだ。

なぜ View とビジネスロジックを分離していくとフレームワークがでかくなるのか。考えてみると不思議だ。フロントエンドのプログラミングといえば非同期処理で、それはUI Eventだけじゃないはず。ウェブあぷりけーしょんなら必ず通信はついてまわるし、今後他の要素も増えてくると考えると、はじめからViewは入出力の重要な要素ではあれどその一部でしかないとも言える。

cycle.js はViewふくめた全ての入出力を統一的に扱う。ただし、図にあるとおり、アプリケーション全体で Observable をまとめていく必要があるので、統一的な方法で分離していくのがむずかしい感じはある。

どちらかというと cycle.js より やっぱ RxJS がめちゃめちゃ良い。 Rx と Matt-Esch/virtual-dom 的なものの組み合わせるもっと良い方法おもいついたらいいな。。