Kekeの日記

エンジニア、読書@1915keke

Apache Pulsar 2.1.0の大進化

昨日、Apache Pulsar2.1.0がリリースされました。 そこで大きな変化について解説したいと思います。

個人的には2.1.0によって、かなり使い勝手の良いものになったのではないかと思っているので解説していこうと思います。

Agenda

0 Apache Pulsar入門 1 PulsarIO 2 Tiered Storage 3 Stateful Function 4 Go Clientの追加 5 Schemaの追加

0. Apache Pulsar入門

公式サイト

https://pulsar.incubator.apache.org/

インストール方法

スタンドアロンで動かすことができます。

公式でガイドがあるのでご覧ください。

何をしてくれるのか?

Apache Pulsar is an open-source distributed pub-sub messaging system originally created at Yahoo and now part of the Apache Software Foundation

日本語訳

Yahooで主に開発されて、今はApacheSoftwareFoundationの一部であるオープンソースな分散Pub/Subメッセージングシステム

特徴

公式サイトによくまとまっていたので引用させていただきます。

  • 複数クラスタのネイティブサポートとクラスタを横断するメッセージのシームレスなジオレプリケーション
  • メッセージの発行及びエンドツーエンドの通信が低レイテンシ
  • 100万以上のトピックに対するシームレスなスケーラビリティ
  • Java, Python, C++によるシンプルなクライアントAPI
  • トピックに対する複数のサブスクリプションモード (Exclusive, Shared, Failover)
  • Apache BookKeeperを用いた永続ストレージによる配信保証

構成

三つの構成要素があります。

  • Producer
  • Broker
  • Consumer

です。それぞれが何をするのかというと簡単にいうと

  • Producer: トピックにデータを流し込む
  • Broker: トピックを保持し、ProcuderとConsumerの仲介人になる
  • Consumer: トピックをSub(購読)し、データを流し込んでもらう

以下のような図になります。

image.png

ここから本題です

1. PulsarIO

image.png

PulsarIOとはSourceSinkに挟まれているものになります。 何をするのかというと非常に単純です。めちゃめちゃ簡単にSourceからデータを受け取り、Sinkに加工して流し込みます。

たとえば公式には執筆段階では以下のものがあります。

  • Aerospike sink
  • Cassandra sink
  • Kafka source
  • Kafka sink
  • Kinesis sink
  • RabbtMQ source
  • Twitter Firehose source

ここでTwitter Firehose sourceが何をするかと名前にもsourceがあるようにTwitterというソースからデータを流し込んでくれます。

もちろんTwitter APIのtokensecret keyが必要です。 しかし、コードをほとんど書かずに設定することができ、これらのtokenなどはconfig.ymlに記述して読ませるだけで、twitterの情報をストリーミングすることができます。

2. Tiered Storage

Apache PulsarではBookkepperというシステムを使って、Bookieというエントリを保存することでデータを永続し、配信を保証しています。

Pulsarではトピックの中でSegmentと呼ばれる要素に分けます。 そして分散処理のためにコピーを異なるBookieに保存します。

image.png (引用:https://streaml.io/blog/pulsar-segment-based-architecture/)

個人的にはスケールアウトできるものはKubernetes一択と思っていましたが、運用コストを考えるとストレージに以上にコストがかかってしまいます。

そこで例としてAmazon S3Google Cloud Storageなどに保存することが考えられます。 一般的にはコストが小さいのが特徴です。

これからは、Segmentで、別のストレージに保存をわけたりすることができるようになります。

image.png

3. Stateful Function

image.png (引用:https://streaml.io/blog/pulsar-segment-based-architecture/)

システムはConsumerからackを受け取っていないときは、勝手にデータを消さないことを保証していて、この方式は永続メッセージングと言われます。 Pulsarでは全てのメッセージのn個のコピーがRAIDボリュームに永続的に保存され、ここでは状態を記録するような一時的なストレージはサポートしていませんでした。

また、日本語サイトではまだ更新されておらず

現時点ではPulsarは永続的なメッセージストレージのみをサポートしています。これは全てのトピック名の先頭の近い将来、Pulsarは一時的なメッセージストレージをサポートします。

と書かれていますが、新たなStateAPIによってそれが可能となります。 リリースノートではcountというstateで文字数を一時的に状態として保存していていました。

以下の状態で直接クエリを投げてstateを確認できます。

$ bin/pulsar-admin functions querystate \
    --tenant <tenant> \
    --namespace <namespace> \
    --name <function-name> \
    --state-storage-url <bookkeeper-service-url> \
    --key <state-key> \
    [---watch]

4. Go Clientの追加

あまり説明することがないのでGo Clientのドキュメントをご覧ください。

(注意) これでJava, python, c++golangのクライアントが出ましたが、これらにないからといって諦める必要はないので注意してください。(WebsocketなどAPIがあります。)

5. Schemaの追加

「データを流し込む」や「データを渡す」などと簡単にこれまではいっていました。 具体的に何しているのかというとデータをシリアライズしているわけです。

しかし、これまで問題点がありました。それは、、、シリアライズをしたデータを人間が読んでもわからないということであり、これでは運用が難しいものがあります。

Pulsarでは以下のバイナリプロトコルに対応しています。

  • ProtocolBuffer
  • JSON
  • Avro

個人的にはAvroが来てくれて嬉しいのですが、ご利用方法に合わせて選択すれば良いと思います。

まとめ

  • PulsarIOで実験してみるのもよし。
  • Tiered Storageでコスト改善に
  • Statuful FunctionでStateが楽に扱える
  • Go Client追加うれしい
  • Schema追加で、嬉しい人には嬉しい(はず)