RxSwift 読んでる
RxSwiftをちょっとずつ読んでる。最初はちょっと継承構造が激しい印象を受けたけど、基本的に関数型のワードに疎い自分でもとても読みやすいです。 Rxの不思議を理解していくと、アプリケーション内の別のパラダイムと仲良くする方法を考えるときのヒントになったり、まだ見ぬ応用が待っている予感がします。
ObservableとSink
Rxは、関数型言語の知識や並行処理の知識がなくても普通に使えるため、まずはその辺の事柄に目をつむり、 Observable
がどう作用しているか理解するところから見ていきます。
さて、 Observable
のオペレーターメソッドはなにをしているんでしょうか。
Observable
に生えている map
やら flatMap
やらの各オペレーターメソッドは、新しいObservable
を作って返しますが、このとき返るのはもちろん、オペレーターごとの挙動がカスタマイズされた新しい Observable
です。RxSwiftでは、これは細かくサブクラスになっています。各オペレーターごとに Observable
のサブクラスがあるイメージです。これらのObserable
は、見た感じ、subscribe
されるまでほぼ何もしません。オペレーターメソッドを呼び出した時点でやることは、新しい Observable
をつくり、自身の1つ前の Observable
の参照を持つことだけ。ちょうど、 Observable
のメソッドチェインを表現した、リンクリストを作っているイメージです。
Observable.just(1) // Observableを返す .map { $0 + 1 } // これもObservable。↑justの参照を持つ。 .filter { $0 < 10 } // これもObservable。↑mapの参照を持つ。 .retry() // これもObservable。↑filterの参照を持つ。 .observeOn(scheduelr) // これも ↑ .take(5) // これも ↑
ちなみに、ひとつ前の Observable
は 内部では一貫して source
という名前で参照されます。たとえば上の例では、filter
の source
は map
です。
各オペレーターメソッドは Observable
を返しますが、Observable
というのはつまるところ subscribe
ができるオブジェクトのことです。ということは、メソッドチェインの途中で現れる数々の Observable
たちも、内部では subscribe
を持っているということになります。すると、それらメソッドチェイン途中の subscribe
では何が行なわれるのか? 言い換えると、メソッドチェインの途中の Observable
の Observer
は何なのか。
その答えが Sink
です。
RxSwift には Sink
という名前の基底クラスが用意されており、オペレーターの実装はこれによってなされています。 Sink
は、ストリームを流れてきた値を自身で処理した後、下の Observer
に値を流すという振舞いをします。
class Sink<O : ObserverType> : SingleAssignmentDisposable { ... }
オペレーターのメソッドチェインによって、Observable
から Observable
を参照するリストができていると上に書きましたが、 Sink
によって、今度は Observer
のチェインができます。Sink
はそれ自身が Observer
でもあり、同時に、1つ下のObserver
の参照も保持しています。自身の処理を実行したら、今度は1つ下の Observer
へ値を流します。( forwardOn
)
// subscribeしたときのイメージ図。下から上へ、subscribeしながら遡る source.source.source.subscribe(Sink(observer: Sink(observer: Sink(observer: observer)))) ↑ source.source.subscribe(Sink(observer: Sink(observer: observer))) ↑ source.subscribe(Sink(observer: observer)) ↑ subscribe(observer)
// 値が流れてきたときのイメージ図。上から下へ流れる sink.on(event) ↓ sink.observer.on(event) ↓ sink.observer.observer.on(event) ↓ sink.observer.observer.observer.on(event)
各オペレータが subscribe
されたとき、イメージではだいたい上の図のようなことが起こっています。厳密にいうと例外が多々ありますが、順番に値が流れるように適切にSinkがセットアップされるということについては、だいたいこういうのを思い描けば良さそうです。
ところで、Sink
のクラス宣言を見るとわかるように、Sink
は Observer
として振舞うと同時に Disposable
を継承している存在でもあります。Sink
は Observer
としての実装を知っているので、合わせてdispose
処理も責任を持つということでしょう。この辺はなんとなく、Sink
が Disposable
だと考えるととイメージがつかみづらいです。RxJSでは、 Disposable
が Subscription
にリネームされてましたが、たしかに、Sink
のようなクラスを見ていると、disposeできるもの〜という表現よりも、subscribeという行為〜 という表現のほうがわかりやすい場合が多いかもしれないっすわ。
Generatorを持つSink
TailRecursiveSink
は、Generator を保持しているSinkです。
class TailRecursiveSink<S: SequenceType, O: ObserverType where S.Generator.Element: ObservableConvertibleType, S.Generator.Element.E == O.E> : Sink<O> , InvocableWithValueType { ... }
Generatorというのは、外部イテレーターとも言いますが、Swiftにおける、コレクションを外部から列挙するためのインターフェイスのことを指します。
// たとえば let g = [1,2,3].generate() // コレクションをGeneratorに変換 g.next() // => 1 g.next() // => 2 g.next() // => 3 g.next() // => nil
TailRecursiveSink
は、排他制御がかなり入っていますが、そこをシカトして見てみると、 Observable
として振舞えそうなオブジェクトの Generator
を持ち、値が流れてくると、 Generator
から 取り出したObservable
で処理させます。
そして、取り出したObservableが error あるいは completed したら、Generator
から次のObservableを取り出して処理させます。以下繰り返し。
これで何が嬉しいかというと、わかりやすい例として、 retry
の実装がこれによって行われています。
retry
の実装は、自身のひとつ前のObservable (つまりsource) を無限に返し続ける Generator を生成し、後は それを TailRecursiveSink
で処理するだけ。ただし onError は無視。というものになっています。これだけで、エラーが起きたらもう一度最初から subscribe する挙動が実現できるとは感動です。
ここが Generator になっていることで、たとえば同じく TailRecursiveSink
によって実装されている concat
も、retry
もクラスがわずか数十行という驚愕の短さになっています。
つづく
Sinkという名前の基底クラスが用意されているのは僕が見た限りRxSwiftのだけっぽかったですが、原理的には他言語のRxもかなり似ているはずです。たいたいこの辺を抑えておけば、オペレーターの実装を読んでいく手がかりになるのではないでしょうか。
ひきつづき、Schedulerを調べてどのへんがクリティカルセクションになりえるのか理解したり、Rx units について考察したりしていきたいです。