Kekeの日記

エンジニア、読書なんでも

リアクティブプログラミングとReactive Extentions

https://blog.oz-code.com/wp-content/uploads/2017/02/reactive-FB.png

画像引用: https://blog.oz-code.com/reactive-wpf-part-1-introduction-reactive-extensions/

本記事

本記事ではRxについてなるべくわかりやすく解説するつもりです。 また、私の過去の記事で注目していたメッセージシステムについてもリアクティブプログラミングと重ね合わせながら書いていきます。

また、これらはRxJavaをベースに書いているので、Java言語以外には使い方が少し違ったり、もっと機能が豊富かもしれませんので、ご了承してください。

Reative Extention(Rx)を体系的に知識をつけられるような記事を目指しています。

目次

Rxとは

注目される「リアクティブ」

ユーザーの何らかのイベントに対して反応するようなリアクティブさというものは注目されています。

というのも流れるデータの量も、イベントの量や種類も増大しています。

私が数ヶ月注目していたキューによるメッセージシステム(Apache BeamやCloud Pub/Sub)などもそのような同様の理由が言えるでしょう。

CPUのマルチコア化やGUIによってアプリケーションが並列や分散処理できる量もはるかに増えています。せっかくCPUがいくつもあるのに、逐次処理のプログラムで書くと1つのCPUでシングルスレッドで行なっているのも同様です。

並列処理の抽象的な概念である並行処理をするならば、複数の処理を同時に行うために便利なリアクティブプログラミングが注目されてきました。また、私自身、今後さらに注目されると思っています。

リアクティブプログラミングとは

リアクティブプログラミングとは、通知されてくるデータを受け取るたびに関連したプログラムが反応するようなものです。

インフラやバックエンド畑の人にはCloud Pub/Subで想像するとわかりやすいのですが、Subしてからはずっと待っていて、メッセージがくるとデータ処理をしてETLなどすると思います。そのような感じだと思ってもらって構いません。

イメージ図としては以下の通りです。これはApache Kafkaの場合です。

f:id:bobchan1915:20181014022504p:plain

引用: https://www.slideshare.net/mreinsch/apache-kafka-as-message-queue-for-your-microservices-and-other-occasions

また、Qiita記事だと執筆時は文章が更新されると定期的に保存されます。また、文章を更新しないかぎり保存はされません。時差はあるといえど、私の入力に対して反応しているのです。

もっと具体化していうと

リアクティブプログラミングとは、データストリームから流れてくるデータに対して、そのデータを受け取ったプログラムが処理する

ことです。ここでストリームとしたのは絶えずメッセージが流れているからです。

リアクティブである恩恵とは

最大の恩恵は、大量のデータを並行して扱えることでしょう。

特に、メッセージを通知する側としては、レスポンスを待つ必要がありません。

「Cloud Pub/Subのキューにいれたら、次の仕事」というように、HTTPリクエストのようにクライアントは待つ必要がありません。

キューにいれたら、あとは知らんぷりです。iOSのUIKitならば、例えばテキストが入力されると通知され、あとはエラーを出したり、アニメーションしたり、、、しかしテキストフィールドが知ることはテキストの内容だけです。あとはSubする側が処理するだけなので、非常に見渡しのよい実装になります。

Rxの構成要素

Reactive Streams

4つのプロトコル

以下のようにStreamsは構成されます。

  • Publisher
  • Subscriber

です。

Publisherがデータを生産して、Subscriberがデータを受け取り処理します。

流れとしては

  • Publisherが通知できる状態になればSubscriberします。このことはonSubscribeと呼ばれます。
  • Publisherがデータを生産するとSubscribernに渡します。これをonNextと呼びます。
  • Publisherがデータを通知し終えるとonCompleteという正常通知の通知をSubscriberに通知します。
  • ストリームの途中でエラーが発生するとonErrorを発火して通知します。

この4つのプロトコルが大事です。

4つのインターフェース

また、Steamsでは以下の4つのインターフェースがあります。

  • Publisher: データを生産し、通知する役割
  • Subscriber: 通知されたデータを受け取り処理する役割
  • Subscription: データの購読に関する処理をする役割
  • Processor: PublisherとSubscriberの両方を持つインターフェース

Reactive Streamのルール

以下のようなルールがあります。

  • onSubscribe通知は一回だけ行うこと
  • 通知は逐次的であるべきで、並列に何個も通知は来ない
  • onCompleteonErrorで終了する

Rxの仕組み

基本

バックプレッシャー(過去のメッセージも取得できること)を対応しているからも関係します。

例えばRxJavaでは

生産者 消費者
Streams対応あり Flowable Subscriber
Streams対応なし Observable Observer

と呼ばれています。Streamsなしはバックプレッシャーの機能がないので、データ数のリクエストはしません。

また、disposeという購読解除のメソッドが必要になります。

オペレーター

オペレータは生産者から消費者までにデータが届くために、データをフィルタリングしたり、変換したり、まとめたりしたりする機能をするものをオペレーターと呼びます。

また、時系列とオペレータの関係を示すために使われる図をマーブルダイアグラムと呼びます。

f:id:bobchan1915:20181014015204p:plain

本記事ではオペレータの種類までは説明しません。

また、関数型プログラミングの思想を強く受けていて、

  • 同じ入力に対しては毎回同じ結果を返す
  • 入力値や外部の値は変更しない

などのルールが適用されています。特にデータを受けて、データを消費しきるまでは副作用を起こさないようにするようにしましょう。これはストリーム処理の責任が管理しやすくするためなので、消費しきるまではデータベースをいじったり、オブジェクトに変更をしないようにしましょう。特に並行処理にすると、何がいつ変更をしたかが全く人間の頭では追えなくなります。

Scheduler

どのスレッドで実行するかをスケジューリングするものです。

特にデータを通知する側の処理範囲と、受け取る側の処理範囲を分断できるメリットがあります。

たとえばユーザーがiOSアプリ内でボタンを押す(メインスレッド)と、HTTPリクエストでデータを取得してきて(バックグラウンド)、取得次第TableViewに描画する(メインスレッド)ケースはたくさんあります。

特にモバイルアプリはUIの変更はメインスレッドで行わないといけないのですが、データ取得までメインスレッドでやっていたらユーザーは待たないといけなくなります。

ColdとHotの生産者

生産者にはふた種類います。

Cold Hot
購読関係 1対1 1対多
通知開始 購読されれば いつでも

基本的には生成時にはCold`です。なのでHotを使いたいときは変換する必要があって、メソッドを呼んだり、ProcessorSubject`を生成します。

Processor/Subject

ProcessorとはReactive Streamsで定義されているPublisherとSubscriberの両方の機能を持っているものです。

つまり、データを受け取れて、データを通知できるものです。また、以下のように種類があります。

種類 説明
PublishProcessor/Subject データを受けとったタイミングでしかデータを通知しないもの
BehaviorProcessor/BehaviorSubject 消費者を購読した直前のデータをバッファし、それから通知するもの
ReplayProcessor/ReplaySubject 受け取ったすべてのデータを途中から購読した消費者にも通知するもの
AsyncProcessor/AsyncSubject データの制せが完了したときに最後のデータしか通知しないもの
UnicastProcessor UnicastSubject | 一つの消費者しか購読されないもの

Single, Maybe, Completable

生産者になるものはFlowableObservableに加えて、通知を行うクラスが三つあります。

それは以下の通りで

名前 説明
Single データを一件だけ通知する
Maybe データを一件だけ通知するか、一件も通知せずに完了を通知するか、エラーを通知するクラス
Completable データを一件も通陽せずに完了するか、もしくはエラーを通知するクラス

です。このときは専用の消費者が存在して

生産者 消費者
Single SingleObserver
Maybe MaybeObserver
Completable CompletableObserver

まとめ

本記事でRxで出てくるような大きな用語・概念は取り扱うことができたのかなと思います。

実際にやってみる方が恩恵を感じるので、ぜひやってみてください。

私は少しRxSwiftをやっていて、これからRxJSをやってみようと思っている段階です。

参考文献

Rx公式ページ

ReactiveX