RxSwift 読んでる

RxSwiftをちょっとずつ読んでる。最初はちょっと継承構造が激しい印象を受けたけど、基本的に関数型のワードに疎い自分でもとても読みやすいです。 Rxの不思議を理解していくと、アプリケーション内の別のパラダイムと仲良くする方法を考えるときのヒントになったり、まだ見ぬ応用が待っている予感がします。

ObservableとSink

Rxは、関数型言語の知識や並行処理の知識がなくても普通に使えるため、まずはその辺の事柄に目をつむり、 Observable がどう作用しているか理解するところから見ていきます。

さて、 Observable のオペレーターメソッドはなにをしているんでしょうか。 Observable に生えている map やら flatMap やらの各オペレーターメソッドは、新しいObservableを作って返しますが、このとき返るのはもちろん、オペレーターごとの挙動がカスタマイズされた新しい Observable です。RxSwiftでは、これは細かくサブクラスになっています。各オペレーターごとに Observable のサブクラスがあるイメージです。これらのObserable は、見た感じ、subscribe されるまでほぼ何もしません。オペレーターメソッドを呼び出した時点でやることは、新しい Observable をつくり、自身の1つ前の Observable の参照を持つことだけ。ちょうど、 Observableメソッドチェインを表現した、リンクリストを作っているイメージです。

Observable.just(1)      // Observableを返す
  .map { $0 + 1 }       // これもObservable。↑justの参照を持つ。
  .filter { $0 < 10 }   // これもObservable。↑mapの参照を持つ。
  .retry()              // これもObservable。↑filterの参照を持つ。
  .observeOn(scheduelr) // これも ↑
  .take(5)              // これも ↑

ちなみに、ひとつ前の Observable は 内部では一貫して source という名前で参照されます。たとえば上の例では、filtersourcemap です。

各オペレーターメソッドObservable を返しますが、Observable というのはつまるところ subscribe ができるオブジェクトのことです。ということは、メソッドチェインの途中で現れる数々の Observable たちも、内部では subscribe を持っているということになります。すると、それらメソッドチェイン途中の subscribe では何が行なわれるのか? 言い換えると、メソッドチェインの途中の ObservableObserver は何なのか。

その答えが Sink です。 RxSwift には Sink という名前の基底クラスが用意されており、オペレーターの実装はこれによってなされています。 Sink は、ストリームを流れてきた値を自身で処理した後、下の Observer に値を流すという振舞いをします。

class Sink<O : ObserverType> : SingleAssignmentDisposable {
  ...
}

オペレーターのメソッドチェインによって、Observable から Observable を参照するリストができていると上に書きましたが、 Sink によって、今度は Observer のチェインができます。Sink はそれ自身が Observer でもあり、同時に、1つ下のObserverの参照も保持しています。自身の処理を実行したら、今度は1つ下の Observer へ値を流します。( forwardOn )

// subscribeしたときのイメージ図。下から上へ、subscribeしながら遡る

source.source.source.subscribe(Sink(observer: Sink(observer: Sink(observer: observer))))
↑
source.source.subscribe(Sink(observer: Sink(observer: observer)))
↑
source.subscribe(Sink(observer: observer))
↑
subscribe(observer)
// 値が流れてきたときのイメージ図。上から下へ流れる

sink.on(event)
↓
sink.observer.on(event) 
↓
sink.observer.observer.on(event)
↓
sink.observer.observer.observer.on(event)

各オペレータが subscribe されたとき、イメージではだいたい上の図のようなことが起こっています。厳密にいうと例外が多々ありますが、順番に値が流れるように適切にSinkがセットアップされるということについては、だいたいこういうのを思い描けば良さそうです。

ところで、Sink のクラス宣言を見るとわかるように、SinkObserver として振舞うと同時に Disposable を継承している存在でもあります。SinkObserver としての実装を知っているので、合わせてdispose 処理も責任を持つということでしょう。この辺はなんとなく、SinkDisposable だと考えるととイメージがつかみづらいです。RxJSでは、 DisposableSubscription にリネームされてましたが、たしかに、Sinkのようなクラスを見ていると、disposeできるもの〜という表現よりも、subscribeという行為〜 という表現のほうがわかりやすい場合が多いかもしれないっすわ。

Generatorを持つSink

TailRecursiveSink は、Generator を保持しているSinkです。

class TailRecursiveSink<S: SequenceType, O: ObserverType where S.Generator.Element: ObservableConvertibleType, S.Generator.Element.E == O.E>
    : Sink<O>
    , InvocableWithValueType {
  ...
}

Generatorというのは、外部イテレーターとも言いますが、Swiftにおける、コレクションを外部から列挙するためのインターフェイスのことを指します。

// たとえば
let g = [1,2,3].generate() // コレクションをGeneratorに変換
g.next() // => 1
g.next() // => 2
g.next() // => 3
g.next() // => nil

TailRecursiveSink は、排他制御がかなり入っていますが、そこをシカトして見てみると、 Observable として振舞えそうなオブジェクトの Generator を持ち、値が流れてくると、 Generator から 取り出したObservable で処理させます。 そして、取り出したObservableが error あるいは completed したら、Generator から次のObservableを取り出して処理させます。以下繰り返し。

これで何が嬉しいかというと、わかりやすい例として、 retry の実装がこれによって行われています。 retry の実装は、自身のひとつ前のObservable (つまりsource) を無限に返し続ける Generator を生成し、後は それを TailRecursiveSink で処理するだけ。ただし onError は無視。というものになっています。これだけで、エラーが起きたらもう一度最初から subscribe する挙動が実現できるとは感動です。 ここが Generator になっていることで、たとえば同じく TailRecursiveSink によって実装されている concat も、retry もクラスがわずか数十行という驚愕の短さになっています。

つづく

Sinkという名前の基底クラスが用意されているのは僕が見た限りRxSwiftのだけっぽかったですが、原理的には他言語のRxもかなり似ているはずです。たいたいこの辺を抑えておけば、オペレーターの実装を読んでいく手がかりになるのではないでしょうか。

ひきつづき、Schedulerを調べてどのへんがクリティカルセクションになりえるのか理解したり、Rx units について考察したりしていきたいです。