Kekeの日記

エンジニア、読書@1915keke

メッセージングセマンティクスEffectively-once

本記事

image.png

Apache Kafkaとよく似たApache Pulsarと呼ばれる新しいOSSが登場しました。 本記事はPub/Sub型メッセージ配送方法の信頼性について解説し、Pulsarで登場したセマンティクスEffectively-onceを解説します。

メッセージ配送方式とは

Pub/Sub型メッセージ配送方法には3つの構成要員がいます。 大雑把に解説すると

  • Publisher: データをトピックにpushする人
  • Topic: データをsubscripberに流してあげる人
  • Subsriber: データを受信する人

のようになります。略図は以下のようになります。

image.png (引用:https://cloud.google.com/pubsub/docs/overview?hl=ja)

初心者の方に説明すると「なぜ必要なのか?httpで送れよ」とよく言われます。

ユーザーの追跡している場合など、POSTなどでデータを投げ続けたいときがあります。 そのリクエストを受信・永続化するAPIサーバーが常に受信できるとは限りません。 ユーザが大量にログインしてて、いっぱいいっぱいでレスポンスを返せません。

そのようなときに

  • タイムアウト
  • 永遠に再送する
  • クライアント(ブラウザやアプリは何も起きない)

など直面してはいけない問題ばかりです。 しかし、このPub/Sub型はSubscriberの都合で受信するようになります。

つまり、クライアントからのリクエストはトピック内にpushだけしといて、終わることができます。そのような面でリアルタイムデータ処理などに向いているので多用されています。

データの保証

高速に扱えたりする一方でデータを保証するのが難しいです。 今回のPulsarではEffectively-onceというセマンティクスだと言われていますが、従来では以下のようなセメンティクスがあります。

  • at most once: メッセージは一度しか送信されないが、欠損する可能性がある。
  • at least once: メッセージは確実に届くが、重複する可能性がある。例:fluendGoogle Cloud Pub/Subなど。
  • exactly once: メッセージは欠損も重複もなく届けられる。Apache Flinkなど。

Apache Kafkaは設定により変えることができます。

それぞれについて軽く解説します。

1. at most once

パケットが破棄されたりされても再送しないのがat most onceです。 このようなシステムのときは再送をお願いすることになります。 しかし、これだとまた重複してしまう危険性があり、実用的ではありません。

2. at least once

メッセージの再送を代表に、重複するケースがあります。 しかし、それはシステム上困るケースがあります。たとえば購入などです。

一回の購入で、何回も決済されたらたまったもんじゃありません。 そのためにUUIDをつけるなど重複排除が必要です。

3. exactly once

もっとも理想的であるゆえに、システムは複雑です。 また、どのくらい保証するかという問題はソフトウェアごとに依存するので注意が必要です。

実際にはコーディネータと呼ばれるものでさらにPublisherSubscriberの間に入ります。 その理由は、Topicが仲介人となってPub/Sub型は成り立っているため、PublisherSubscriberは相互の情報を持たないからです。

しかしながら、コーディネータ自身もソフトウェアなので、落ちるケースがあり、やはり現実味がないと言えると思います。

そこで「そもそもexactly once」なのか可能なのか?また、コンセンサス方式、アトミックブロードキャストなど新しい議論が上がっています。

新(?)セメンティクスEffectively-once

定義

We prefer to use the term effectively-once over “exactly-once” because it highlights the fact that a messaging system needs to be able to detect and discard duplicate messages and to do so with a 100% degree of accuracy. We call this feature message deduplication.

日本語訳

私たちはexactly-onceに対してeffectively-onceを使います。それは100%の精度でメッセージは重複を検知し、排除することを強調しています。私たちはメッセージ重複排除と呼ぶ。

詳しく解説してきます。

どういうことなのか?

Effectively Onceでは、メッセージは重複して送られて来ても大丈夫です。これは一般的には問題です。 もっとも重要なのはstate(状態)として一度しか観測されないようになっていることです。

つまり、

exactly-onceはどのようにして重複がなくデータを処理するかに着目していて、`effectively-onceはどのようにして結果として重複がないように処理に取り出せるかに着目しています。

何かデータベースの強整合性結果整合性に若干考え方は似ていますね。

どのようにやっているの?

1. Producer(PulsarのPublisher)の冪等性

再送されたものをシステムは、特定して廃棄しないといけません。再送するケースは分散処理を目的にしているシステムだといろんな理由で容易に起きます。

  • ブローカー(Pub/Subの仲介人)でデータがクラッシュ
  • ネットワークの切断など
  • Publisherがクラッシュしたのきの復旧時

Pulsarのクライアントライブラリでは確かに届けるように、at least onceを保証するように再送します。

v1.20からメッセージ重複排除をサポートしていて、namespace設定から可能にします。

例としては

$ pulsar-admin namespaces set-deduplication $MY_NAMESPACE --enable

です。ここではPulsarのブローカーは重複を排除します。

再度、強調しますがEffectively Once

重複して送信してもいいのです。ただシステムで重複排除をして、取り出すとき重複を無くします。

ブローカーレベルの重複排除の仕組みを説明します。

各ブローカーは各パブリッシャーの最後に成功したpublishされたメーセージのIDを追跡します。また、スナップショットも定期的にとっていて、ブローカーがクラッシュしたときに、クラッシュした前と、クラッシュした後で、一緒になることを保証します。

Pulsarではカーソルを使って保存していて、メモリ上で保存します。 メッセージN個ごとにスナップショットをとって、更新します。 マッピングはproducer -> last sequence IDのようになっていています。

復旧時には最後のスナップショットと同じになるように再構成されます。

2. Producerがクラッシュしたとき

ここまではPulsar型、つまりブローカーが重複を排除する仕組みを解説していました。

ここではProducerがクラッシュしてしまったときを解説します。

Producerがブローカーに接続するとUIDが割り当てられ、データにつきsequence ID0から連続的に決まります。

もし、Producerがクラッシュして、再度トピックに接続するとどのようになるでしょう?

Javaだと以下のコードだとProducerは識別できません。

Producer producer = client.createProducer(TOPIC_NAME, conf);

しかし、連続性を保つ(=Producerを識別する)にはProducerの名前を識別できればよいのです。なので名前をつけます。

ProducerConfiguration conf = new ProducerConfiguration();
conf.setProducerName("my-producer");
conf.setSendTimeout(0, TimeUnit.SECONDS);
Producer producer = client.createProducer(TOPIC_NAME, conf);

また以下のスクリプトで最後のsequece IDに接続することができます。

long lastSequenceId = producer.getLastSequenceId();

ここでは100%近くを保証しますが、ステートレスの代表であるHTTPリクエストのように再送できないものがあります。

Consumerが重複無しにメッセージを受け取る

データはシステム内部のあらゆるレベルで重複排除されます。なので簡単に重複無しに取り出すことができます。

Consumer consumer = client.subscribe(MY_TOPIC, MY_SUBSCRIPTION_NAME);

while (true) {
    Message msg = consumer.receive();
    // Process the message...
    consumer.acknowledge(msg);
}

最終的な重複排除はアプリケーション側で何か対応しないといけませんがat least onceで運用していることがケースで重複排除に関する手法はかなり知られているのでここでは紹介しません。

パフォーマンス

実験結果のデータはなかったですが、Kafkaなどと比べて顕著なオーバーヘッドはないようです。

結論

PulsarではEffectively Onceをジオレプリーケションの場合でも保証します。 メッセージ重複排除は非常に高い保証性があるので、重要なデータを扱う中では強力なものになると思います。

参考文献