昨日、Apache Pulsar2.1.0がリリースされました。 そこで大きな変化について解説したいと思います。
個人的には2.1.0
によって、かなり使い勝手の良いものになったのではないかと思っているので解説していこうと思います。
Agenda
0 Apache Pulsar入門 1 PulsarIO 2 Tiered Storage 3 Stateful Function 4 Go Clientの追加 5 Schemaの追加
0. Apache Pulsar入門
公式サイト
https://pulsar.incubator.apache.org/
インストール方法
スタンドアロンで動かすことができます。
公式でガイドがあるのでご覧ください。
何をしてくれるのか?
Apache Pulsar is an open-source distributed pub-sub messaging system originally created at Yahoo and now part of the Apache Software Foundation
日本語訳
Yahooで主に開発されて、今はApacheSoftwareFoundationの一部であるオープンソースな分散Pub/Subメッセージングシステム
特徴
公式サイトによくまとまっていたので引用させていただきます。
- 複数クラスタのネイティブサポートとクラスタを横断するメッセージのシームレスなジオレプリケーション
- メッセージの発行及びエンドツーエンドの通信が低レイテンシ
- 100万以上のトピックに対するシームレスなスケーラビリティ
- Java, Python, C++によるシンプルなクライアントAPI
- トピックに対する複数のサブスクリプションモード (Exclusive, Shared, Failover)
- Apache BookKeeperを用いた永続ストレージによる配信保証
構成
三つの構成要素があります。
- Producer
- Broker
- Consumer
です。それぞれが何をするのかというと簡単にいうと
- Producer: トピックにデータを流し込む
- Broker: トピックを保持し、ProcuderとConsumerの仲介人になる
- Consumer: トピックをSub(購読)し、データを流し込んでもらう
以下のような図になります。
ここから本題です
1. PulsarIO
PulsarIOとはSource
とSink
に挟まれているものになります。
何をするのかというと非常に単純です。めちゃめちゃ簡単にSourceからデータを受け取り、Sinkに加工して流し込みます。
たとえば公式には執筆段階では以下のものがあります。
- Aerospike sink
- Cassandra sink
- Kafka source
- Kafka sink
- Kinesis sink
- RabbtMQ source
- Twitter Firehose source
ここでTwitter Firehose sourceが何をするかと名前にもsource
があるようにTwitterというソースからデータを流し込んでくれます。
もちろんTwitter APIのtoken
とsecret key
が必要です。
しかし、コードをほとんど書かずに設定することができ、これらのtoken
などはconfig.yml
に記述して読ませるだけで、twitterの情報をストリーミングすることができます。
2. Tiered Storage
Apache PulsarではBookkepper
というシステムを使って、Bookie
というエントリを保存することでデータを永続し、配信を保証しています。
Pulsarではトピックの中でSegment
と呼ばれる要素に分けます。
そして分散処理のためにコピーを異なるBookieに保存します。
(引用:https://streaml.io/blog/pulsar-segment-based-architecture/)
個人的にはスケールアウトできるものはKubernetes
一択と思っていましたが、運用コストを考えるとストレージに以上にコストがかかってしまいます。
そこで例としてAmazon S3
やGoogle Cloud Storage
などに保存することが考えられます。
一般的にはコストが小さいのが特徴です。
これからは、Segment
で、別のストレージに保存をわけたりすることができるようになります。
3. Stateful Function
(引用:https://streaml.io/blog/pulsar-segment-based-architecture/)
システムはConsumerからack
を受け取っていないときは、勝手にデータを消さないことを保証していて、この方式は永続メッセージングと言われます。
Pulsarでは全てのメッセージのn個のコピーがRAIDボリュームに永続的に保存され、ここでは状態を記録するような一時的なストレージはサポートしていませんでした。
また、日本語サイトではまだ更新されておらず
現時点ではPulsarは永続的なメッセージストレージのみをサポートしています。これは全てのトピック名の先頭の近い将来、Pulsarは一時的なメッセージストレージをサポートします。
と書かれていますが、新たなStateAPI
によってそれが可能となります。
リリースノートではcount
というstate
で文字数を一時的に状態として保存していていました。
以下の状態で直接クエリを投げてstate
を確認できます。
$ bin/pulsar-admin functions querystate \ --tenant <tenant> \ --namespace <namespace> \ --name <function-name> \ --state-storage-url <bookkeeper-service-url> \ --key <state-key> \ [---watch]
4. Go Clientの追加
あまり説明することがないのでGo Clientのドキュメントをご覧ください。
(注意)
これでJava
, python
, c++
、golang
のクライアントが出ましたが、これらにないからといって諦める必要はないので注意してください。(WebsocketなどAPIがあります。)
5. Schemaの追加
「データを流し込む」や「データを渡す」などと簡単にこれまではいっていました。 具体的に何しているのかというとデータをシリアライズしているわけです。
しかし、これまで問題点がありました。それは、、、シリアライズをしたデータを人間が読んでもわからないということであり、これでは運用が難しいものがあります。
Pulsarでは以下のバイナリプロトコルに対応しています。
- ProtocolBuffer
- JSON
- Avro
個人的にはAvro
が来てくれて嬉しいのですが、ご利用方法に合わせて選択すれば良いと思います。
まとめ
- PulsarIOで実験してみるのもよし。
- Tiered Storageでコスト改善に
- Statuful Functionで
State
が楽に扱える - Go Client追加うれしい
- Schema追加で、嬉しい人には嬉しい(はず)