画像引用: https://blog.oz-code.com/reactive-wpf-part-1-introduction-reactive-extensions/
本記事
本記事ではRx
についてなるべくわかりやすく解説するつもりです。
また、私の過去の記事で注目していたメッセージシステムについてもリアクティブプログラミングと重ね合わせながら書いていきます。
また、これらはRxJavaをベースに書いているので、Java言語以外には使い方が少し違ったり、もっと機能が豊富かもしれませんので、ご了承してください。
Reative Extention(Rx)を体系的に知識をつけられるような記事を目指しています。
目次
Rxとは
注目される「リアクティブ」
ユーザーの何らかのイベントに対して反応するようなリアクティブさというものは注目されています。
というのも流れるデータの量も、イベントの量や種類も増大しています。
私が数ヶ月注目していたキューによるメッセージシステム(Apache BeamやCloud Pub/Sub)などもそのような同様の理由が言えるでしょう。
CPUのマルチコア化やGUIによってアプリケーションが並列や分散処理できる量もはるかに増えています。せっかくCPUがいくつもあるのに、逐次処理のプログラムで書くと1つのCPUでシングルスレッドで行なっているのも同様です。
並列処理の抽象的な概念である並行処理をするならば、複数の処理を同時に行うために便利なリアクティブプログラミングが注目されてきました。また、私自身、今後さらに注目されると思っています。
リアクティブプログラミングとは
リアクティブプログラミングとは、通知されてくるデータを受け取るたびに関連したプログラムが反応するようなものです。
インフラやバックエンド畑の人にはCloud Pub/Subで想像するとわかりやすいのですが、Subしてからはずっと待っていて、メッセージがくるとデータ処理をしてETLなどすると思います。そのような感じだと思ってもらって構いません。
イメージ図としては以下の通りです。これはApache Kafkaの場合です。
また、Qiita記事だと執筆時は文章が更新されると定期的に保存されます。また、文章を更新しないかぎり保存はされません。時差はあるといえど、私の入力に対して反応しているのです。
もっと具体化していうと
リアクティブプログラミングとは、データストリームから流れてくるデータに対して、そのデータを受け取ったプログラムが処理する
ことです。ここでストリームとしたのは絶えずメッセージが流れているからです。
リアクティブである恩恵とは
最大の恩恵は、大量のデータを並行して扱えることでしょう。
特に、メッセージを通知する側としては、レスポンスを待つ必要がありません。
「Cloud Pub/Subのキューにいれたら、次の仕事」というように、HTTPリクエストのようにクライアントは待つ必要がありません。
キューにいれたら、あとは知らんぷりです。iOSのUIKitならば、例えばテキストが入力されると通知され、あとはエラーを出したり、アニメーションしたり、、、しかしテキストフィールドが知ることはテキストの内容だけです。あとはSubする側が処理するだけなので、非常に見渡しのよい実装になります。
Rxの構成要素
Reactive Streams
4つのプロトコル
以下のようにStreamsは構成されます。
- Publisher
- Subscriber
です。
Publisherがデータを生産して、Subscriberがデータを受け取り処理します。
流れとしては
- Publisherが通知できる状態になればSubscriberします。このことは
onSubscribe
と呼ばれます。 - Publisherがデータを生産するとSubscribernに渡します。これを
onNext
と呼びます。 - Publisherがデータを通知し終えると
onComplete
という正常通知の通知をSubscriberに通知します。 - ストリームの途中でエラーが発生すると
onError
を発火して通知します。
この4つのプロトコルが大事です。
4つのインターフェース
また、Steamsでは以下の4つのインターフェースがあります。
- Publisher: データを生産し、通知する役割
- Subscriber: 通知されたデータを受け取り処理する役割
- Subscription: データの購読に関する処理をする役割
- Processor: PublisherとSubscriberの両方を持つインターフェース
Reactive Streamのルール
以下のようなルールがあります。
onSubscribe
通知は一回だけ行うこと- 通知は逐次的であるべきで、並列に何個も通知は来ない
onComplete
かonError
で終了する
Rxの仕組み
基本
バックプレッシャー(過去のメッセージも取得できること)を対応しているからも関係します。
例えばRxJavaでは
生産者 | 消費者 | |
---|---|---|
Streams対応あり | Flowable | Subscriber |
Streams対応なし | Observable | Observer |
と呼ばれています。Streamsなしはバックプレッシャーの機能がないので、データ数のリクエストはしません。
また、dispose
という購読解除のメソッドが必要になります。
オペレーター
オペレータは生産者から消費者までにデータが届くために、データをフィルタリングしたり、変換したり、まとめたりしたりする機能をするものをオペレーターと呼びます。
また、時系列とオペレータの関係を示すために使われる図をマーブルダイアグラムと呼びます。
本記事ではオペレータの種類までは説明しません。
また、関数型プログラミングの思想を強く受けていて、
- 同じ入力に対しては毎回同じ結果を返す
- 入力値や外部の値は変更しない
などのルールが適用されています。特にデータを受けて、データを消費しきるまでは副作用を起こさないようにするようにしましょう。これはストリーム処理の責任が管理しやすくするためなので、消費しきるまではデータベースをいじったり、オブジェクトに変更をしないようにしましょう。特に並行処理にすると、何がいつ変更をしたかが全く人間の頭では追えなくなります。
Scheduler
どのスレッドで実行するかをスケジューリングするものです。
特にデータを通知する側の処理範囲と、受け取る側の処理範囲を分断できるメリットがあります。
たとえばユーザーがiOSアプリ内でボタンを押す(メインスレッド)と、HTTPリクエストでデータを取得してきて(バックグラウンド)、取得次第TableViewに描画する(メインスレッド)ケースはたくさんあります。
特にモバイルアプリはUIの変更はメインスレッドで行わないといけないのですが、データ取得までメインスレッドでやっていたらユーザーは待たないといけなくなります。
ColdとHotの生産者
生産者にはふた種類います。
Cold | Hot | |
---|---|---|
購読関係 | 1対1 | 1対多 |
通知開始 | 購読されれば | いつでも |
基本的には生成時にはCold`です。なので
Hotを使いたいときは変換する必要があって、メソッドを呼んだり、
Processorや
Subject`を生成します。
Processor/Subject
ProcessorとはReactive Streamsで定義されているPublisherとSubscriberの両方の機能を持っているものです。
つまり、データを受け取れて、データを通知できるものです。また、以下のように種類があります。
種類 | 説明 |
---|---|
PublishProcessor/Subject | データを受けとったタイミングでしかデータを通知しないもの |
BehaviorProcessor/BehaviorSubject | 消費者を購読した直前のデータをバッファし、それから通知するもの |
ReplayProcessor/ReplaySubject | 受け取ったすべてのデータを途中から購読した消費者にも通知するもの |
AsyncProcessor/AsyncSubject | データの制せが完了したときに最後のデータしか通知しないもの |
UnicastProcessor | UnicastSubject | 一つの消費者しか購読されないもの |
Single, Maybe, Completable
生産者になるものはFlowable
とObservable
に加えて、通知を行うクラスが三つあります。
それは以下の通りで
名前 | 説明 |
---|---|
Single | データを一件だけ通知する |
Maybe | データを一件だけ通知するか、一件も通知せずに完了を通知するか、エラーを通知するクラス |
Completable | データを一件も通陽せずに完了するか、もしくはエラーを通知するクラス |
です。このときは専用の消費者が存在して
生産者 | 消費者 |
---|---|
Single | SingleObserver |
Maybe | MaybeObserver |
Completable | CompletableObserver |
まとめ
本記事でRxで出てくるような大きな用語・概念は取り扱うことができたのかなと思います。
実際にやってみる方が恩恵を感じるので、ぜひやってみてください。
私は少しRxSwiftをやっていて、これからRxJSをやってみようと思っている段階です。
参考文献
Rx公式ページ