Kekeの日記

エンジニア、読書@1915keke

Apache Stormのトポロジーチュートリアル

f:id:bobchan1915:20180816182740p:plain

本記事ではApache Stormのトポロジーを中心しに解説していこうと思います。

前提知識

StormのAPIがデザインパターンBuilderを使用しますので知らなければ学習してください。

用語

  • topology: SpoutとBoltから構成されるネットワーク構造。
  • tuple: 流れるデータの集まりである。値のリストである。
  • Spout: ストリームのソース。
  • Bolt: 処理を施すノード。

Data modelを定義する

データモデルとしてタプルを使用します。

トポロジの一つ一つは、output fieldを定義しなくてはなりません。 ノードがどのようなoutputを出すのかを定義しないといけません。

@Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("double", "triple"));
    }    

ここでは["double", "triple"]を定義しています。

トポロジーを作る

一つのSpoutと二つのBoltを定義する。

TopologyBuilder builder = new TopologyBuilder();        
builder.setSpout("words", new TestWordSpout(), 10);        
builder.setBolt("exclaim1", new ExclamationBolt(), 3)
        .shuffleGrouping("words");
builder.setBolt("exclaim2", new ExclamationBolt(), 2)
        .shuffleGrouping("exclaim1");

ここでは逐次的な構築をしている。 10, 3などのマジカルナンバーはその構成要素の数(スレッド数)を表す。

ShuffleGroupingとは各ボルトが等しい数のタプルを受け取れるようにするもの。 FieldGroupingはフィールドごとにボルトを設定できるもの。

Spoutについて

SpoutはタプルをnextTupleでストリームすることができる。

たとえばSpoutは以下のようになります。

public class RandomSentenceSpout extends BaseRichSpout {
     @Override
  public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
    _collector = collector;
    _rand = new Random();
  }

  @Override
  public void nextTuple() {
    Utils.sleep(100);
    String[] sentences = new String[]{sentence("the cow jumped over the moon"), sentence("an apple a day keeps the doctor away"),
            sentence("four score and seven years ago"), sentence("snow white and the seven dwarfs"), sentence("i am at two with nature")};
    final String sentence = sentences[_rand.nextInt(sentences.length)];

    LOG.debug("Emitting tuple: {}", sentence);

    _collector.emit(new Values(sentence));
  }
}

Boltについて

Boltは変換処理をする。

重要な点として、以下の点があります。

  • execute: 処理を実行する。awkを返さないといけない。
  • declareOutputFields: 出力のタプルのフィールドを指定する。
public static class WordCount extends BaseBasicBolt {
    Map<String, Integer> counts = new HashMap<String, Integer>();

    @Override
    public void execute(Tuple tuple, BasicOutputCollector collector) {
      String word = tuple.getString(0);
      Integer count = counts.get(word);
      if (count == null)
        count = 0;
      count++;
      counts.put(word, count);
      collector.emit(new Values(word, count));
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("word", "count"));
    }
  }

BaseBasicBoltを継承してBoltを定義します。

Topologyをデプロイする

storm jar [jarファイル] [クラス名] [...引数]

まとめ

  • とにかく@Overrideする項目を定義すればokです。
  • スレッド数など簡単に設定できる一方で、正しく設定するには経験が必要そう

Apache Azkaban, Airflow, Luige, Oozie比較

本記事

Apache Azkaban, Airflow, Luige, Oozieのこれらはワークフロー管理に使われるOSSです。 HadoopやSparkなどを使って分散処理をナイトリーに行なっている場合、そのスケージュールリングだったり監視などが大変になってきます。

そのような中で、ワークフロー管理OSSがあるわけですが、どのようにして選定できるかを比較しながら考えます。

tl;dr

比較表は以下の通り。

OSS名 開発元 目的 主な開発方法 留意点
Azkaban LinkedIn hoge Hadoopのバッジ処理を時間単位でスケージューリングする目的。 GUI  リッチなUIで監視、検索したりすることができる。
Airflow Airbnb 2015 DAGSのようにワークフローをモニタし、認証し、スケージューリングする目的。 Luigeのようにプログラムベース リッチなCLIツールやUIがある
Luige Spotify 2011 複雑なパイプラインを構築することを手伝う。 Pythonスクリプトベース Hadoop Supportがあるが、何もかもに対応しようとしている(Hive, Spark)'。コミュニティが大きい。
Oozie Yahoo 見つけれず Azkabanと同様にHadoopのバッジ処理をスケージューリングする目的。  XMLベース Azkabanが時間単位でスケージューリングするのに対して、時間、イベント、データの状況でジョブをトリガーできる(データがないのにバッジ処理をしてしまうのを防ぐ)。 もっとも古い。

1. Azkaban

1.1 UI

f:id:bobchan1915:20180806212552p:plain

2. Airflow

2.1 UI

f:id:bobchan1915:20180806212903j:plain

3. Luige

3.1 UI

f:id:bobchan1915:20180806212928j:plain

4. Oozie

4.1 UI

f:id:bobchan1915:20180806212753p:plain

結論

Hadoopでしか使わないか、他に使うかで分かれると感じる。

使う場合はAzkaban、他に使う場合はAirflowLuigeでいいのではないでしょうか?

もう一つの考え方としては、「宣言的にかけるかどうか」。

Azkabanは主にGUIなので、同じくプログラムで書きたかったらAirflowLuigeになる。

やっぱり、差があまり感じられないのでテスト運用してみて、感じたことがあれば追記します。

PrestoでApache Kafkaに分散クエリを投げる

スクリーンショット 2018-08-14 21.02.12.png

対象読者

  • 分散SQL環境を構築したい方
  •  Prestoに入門したい方
  •  Kafkaの中身にクエリを投げたい方

Prestoとは

presto-og.png

分散SQLクエリエンジン 

以下のようなアーキテクチャです。

image.png

特徴 - 分散しているので高速なクエリが叩ける。Hiveはバッジ処理を目的とするため遅い。 - 複数のデータソースにクエリを投げれる。 - 似た者でApache HiveApache Impalaがあり、これらはどれもSQLエンジン。

詳しくは以下の記事をご覧ください。

tug.red

Apache Kafkaとは

download.png

分散ストリーミング基盤

世界の上場している企業の半数近くが採用しているらしいです。

特徴

  • ストリームを配信することができる
  • 分散してキューを取り出すことによって分散処理ができる

詳しくは以下の記事をご覧ください。

kafka.apache.org

PrestoでKafkaにクエリを投げる

ヘキサゴナルアーキテクチャ故に、概念が抽象化されています。

1. Connectorを定義する

Tutorialにもありますが、いくつかプロパティを定義しないといけません。 一覧にまとめました。

property 何を設定するか
kafka.table-names テーブル名
kafka.nodes Kafkaのクラスタのhostname:port
kafka.connect-timeout タイムアウトまでの時間
kafka.table-descrition-dir トピックの説明ファイル(.json)が入ったディレクトリ
kafka.default-schema テーブルのデフォルトのスキーマ
kafka.buffer-size バッファサイズ
kafka.hide-internal-columns 内部カラムがテーブルのスキーマに入るのかを定義 

 一つ一つ解説をしましょう。

kafka.table-names カンマ,で分けることにより複数定義することができます。

ここで定義されるテーブルはそれぞれ説明ファイルを定義することができる。 説明ファイルがなければtable名=topic名である。

<schema-name>.<table-name>のように名前をつけるとよい。

kafka.default-schema schemaがあると例えばトピックfront.userがあったとして、frontででもクエリを投げることができます。 これはkafkaの要請ではないので、あくまでもprestoを使ううえでのことです。

kafka.table-description-dir 省略可。 特に細かな設定をしない場合はきにすることがありません。

デフォルトでは、etc/kafkaに配置してあります。

テーブル定義ファイル

Kafkaのトピックはシリアライズされた物が入っているので、Prestoではこれらのデータはマッピングされないといけない。

.jsonで定義する。

いまのところ、Apache Avroはサポートされていません。

やっと投げてみる

設定したのを前提に進めていきます。

1. デモ用のトピックにデータをpublishする

あらかじめkafka Producer APIを使って1~99の連番をトピックに書き込みました。

試しにkafka CLIを使ってみて見ると

kafka-console-consumer --bootstrap-server localhost:9092 --partition 1 --topic liveai.greet --from-beginning         20:19 14-08-2018
1
3
4
7
8
9
13
14
16
19
22
24
25
28
29
30
31
32
33
35
39
40
44
46
50
53
57
59
60
64
65
66
70
71
73
75
77
78
79
80
81
84
87
88
90
92
96
97
99

値が飛び飛びになっているものは、このトピックがパーティション数を2にしているからです。だいたい半分取得できているのでいいでしょう。

2. Presto CLIを使って接続します。

私の場合はcatalog名が kafkaで、スキーマがliveaiなので以下のようになります。

./presto --catalog kafka --schema liveai
presto:liveai>|

はじめに付け足すと、tabを使うと補完できるので知っておくと便利です。

presto:liveai>
CREATE TABLE     DESCRIBE         DROP TABLE       EXPLAIN          HELP             QUIT             SELECT           SHOW CATALOGS    SHOW COLUMNS     SHOW FUNCTIONS   SHOW SCHEMAS     SHOW SESSION     SHOW TABLES      USE

そして、テーブル名を見てみます。

presto:liveai> show tables;
 Table
-------
 greet
(1 row)

https://localhost:8080/uiでは、以下のようになります。(しょっぱなからサーバーをたて忘れていて、INTERNAL ERRORがででいますが気にしないでください。

スクリーンショット 2018-08-14 20.33.13.png

ここで注意してほしいのが、prest serverがたっていようとなかろうとpresto対話シェルは起動できます。 私がエラーを出したときもサーバーがたっていないことに気づきませんでした。

3. topic内をクエリで検索

SQL言語です。

presto:liveai> SELECT count(*) FROM greet;
 _col0
-------
   100
(1 row)

Query 20180814_122455_00019_4tvz4, FINISHED, 1 node
Splits: 19 total, 19 done (100.00%)
0:00 [100 rows, 190B] [230 rows/s, 437B/s]

ちゃんと100個ありますね。

UIは以下のように更新されています。

スクリーンショット 2018-08-14 21.28.35.png

まとめ

どちらもスタンドアローンモードがあるので試すのは簡単。ぜひ試してみてください。

Apache AvroとProtocol Buffers

本記事

f:id:bobchan1915:20180813130055p:plain

f:id:bobchan1915:20180813130059p:plain

Apache Kafkaなどメッセージ配送のためのOSSを使用していると何かしらシリアライズすることになります。 そこでXMLJSON, ASN.1など選択しないといけません。どれが一体良いのでしょうか?

本記事では、どのようにしてシリアライズすればいいかを解説していこうと思います。

Schemaに求められるものとは

1. ロバストネス

ストリーム処理をする中で、もっとも良い点がProducerとConsumerが切り離されていることになります。

しかし、なぜどのデータが正しいか保証できるのでしょう? スキーマがなければ、データが欠損したり、データがコピーできなかったり問題があります。

2. 明確さとセマンティック

アプリケーション間で基準的なものがなく、「データ」というものが不明瞭になります。 必ずといっていいほどバグを招きます。 最近のドキュメントを梱包するべきです。

3. 互換性

スキーマはもっとも問題である「データの変更」に対して対処することができます。 プロダクトが成長するについて、必ずといっていいほどデータの追加、削除または変更があります。 特にデータベースへの保存をするときに、決定的なエラーを招きます。

特にたくさん使われているAPIサーバーやデータベースだと変更に対して、どこへ影響を及ぼすかわかりません。 また、すでに保存している数TBのものを変更しないといけなくなります。

スキーマを設けることでCassandraやHadoopなどに変更なしに格納できるようにしたいです。

4. 会話としてのスキーマ

Producerのアプリを作っている人は、だいたいConsumerであることは限られています。

つまり、スキーマをもってどのようなデータがくるかなど予測する必要があります。

用語

  • スキーマ: データをバイナリ形式にシリアラズ、デシリアライズするための規格。

なぜ必要か

Kafkaなどでストリーミングをしているときに、自分が

  • どのようなデータ形式がくるか
  • どのような型で、デフォルト値はどうなのか
  • どうデータ形式が変化するのか

を全て知り尽くしているケースならば、特に気にすることはありません。 しかし、チームや会社単位で使い回す時に、何かしらの「決まりごと」が必要です。

そのためのものです。

Apache Avro

特徴

  • リッチなデータ構造に対応できる
  • コンパクト、高速なバイナリ形式
  • 永続化ためのコンテナファイル
  • RPC
  • 動的言語との統合性

Apache avroのスキーマはjsonで定義し、以下のようになります。

 { 
     "namespace": "com.test.avro", 
     "type": "record", 
     "name": "Data",  
     "fields": [ 
         {"name": "age", "type": "int"} 
     ] 
 } 

必ずしもjsonを定義する必要がなく、動的に設定することができます。

また

  • Avroはバイナリ化したファイルに格納されるため、のちのアプリで使うことができます。
  • JSONでスキーマは定義されるので、JSONライブラリがある言語では使うことができます。

Apache Kafkaなどと同様に使いたい時は以下のリンクが参考になるかもしれません。

Kafka, Spark and Avro - Part 3, Producing and consuming Avro messages

Protocol Buffers

Google内部で使われていたシリアライズの方法である。

スキーマ定義のための言語があって、以下のような言語であり、.protoに定義するようになる。

syntax = "proto3";
package example.protobuf;

message SimpleMessage {
    message Items {
        string name = 1;
        string value = 2;
    }

    uint64 id = 1;
    Type message_type = 2;
    repeated HeaderItem headers = 3;
}

=1などはタグと呼ばれ、時間に普遍なものを定義する。 これによってフィールドの名前が変更されても、タグで識別できるということである。

2つの比較

  • 動的タイピング: Avroの場合、何かプログラムを書かなくてもすみます。データは全てスキーマと合わせて定義される。
  • 非タグデータ: データが読まれる時にスキーマは有効なため、相当、型の情報が必要ではない。Protocol Bufferのセクションでname = 1などタグを定義する必要がなく、型を定義しないといけなかった。
  • IDを手動で操作する必要がない: スキーマが変更されたときに、データを処理する時にはすでに存在する。Protocol Bufferのセクションでname = 1などタグを定義する必要がない。
  • JSONから直接変換できるか: Avroの場合できる。

以下のようなデータがあったとする。

{
  "time": 1424849130111,
  "customer_id": 1234,
  "product_id": 5678,
  "quantity":3,
  "payment_type": "mastercard"
}

するとスキーマは以下のようになる。

{
  "type": "record",
  "doc":"This event records the sale of a product",
  "name": "ProductSaleEvent",
  "fields" : [
    {"name":"time", "type":"long", "doc":"The time of the purchase"},
    {"name":"customer_id", "type":"long", "doc":"The customer"},
    {"name":"product_id", "type":"long", "doc":"The product"},
    {"name":"quantity", "type":"int"},
    {"name":"payment",
     "type":{"type":"enum",
         "name":"payment_types",
             "symbols":["cash","mastercard","visa"]},
     "doc":"The method of payment"}
  ]
}

これによって - ProducerやConsumerがどのような情報をやりとりするかを知ることができる。 - ドキュメント自体もdocフィールドで梱包している。 - 有効なデータしかトピックに送信されてないため、それ以降のデータ処理を守る。

勉強会などにいってもKafkaトピックごとに設定しないという情報が蔓延しているがSchema Registryというものがあるので、そこにスキーマを登録しておける。 ただし、Conflent社がやっているConfluent Platform上での使えるものです。

参考文献

YamlのAnchor&、Extend<<、Extended Inline<< &について

本記事

CIやKubernetesなどインフラに関する設定項目は、多くの場合yamlで書くようになります。 つまり、yamlをマスターすることが何よりもCIやPaasなどのドキュメントを読むくらい大事なのです。

CircleCIやテンプレートを書くときなど、yamlを書くことは多い一方で、肥大化しまいがちです。 &<<を使うことで簡単にファイル内モジュール化のようなことができるので紹介します。

注意

アンカーとして定義するfooは出力されないと仮定します。

yamlは外部ファイルなどにモジュールをまとめられないのか?」とよく聞かれますが、できないようです。

ちなみに文字列連結などもできないようなので、できることが非常に限られています。

Anchor&

&を使ってアンカーを定義し、*で呼び出すことができます。

用途: 使いまわしたいとき

foo: &foo
 name: "Pupu"
 age: 12

bar: *foo
{
    "bar": {
        "name": "Pupu"
        "age": 12
    }
}

のようになります。

Extend<<

あるアンカーを呼び出して、さらに同じレベルで追加します。

用途: あるAnchorをさらに拡張したいとき。

foo: &foo
 name: "Pupu"
 age: 12

bar:
   <<: *foo
   email: "kukakuka@gmail.exp"
{
    "bar": {
        "name": "Pupu"
        "age": 12
        "email": "kukakuka@gmail.exp"
    }
}

Extended Inline << &

一部しか継承したくないときに使用する。

用途: とあるアンカーをextend<<したいが、一部しかしたくない

foo:  &foo
 <<: &foo_password
   password: "nemeneme"
 name: "Pupu"
 age: 12

bar:
   <<: *foo_password
   email: "kukakuka@gmail.exp"
{
    "bar": {
        "password": "nemeneme",
        "email": "kukakuka@gmail.exp"
    }
}

デバッグ方法のすすめ

yqというpythonのライブラリを使うと簡単に確認をすることができます。

pip install yq

cat config.yml | yq "."

まとめ

  • anchorextendを使ってyamlで書かれた設定を管理しやすくすることができる。
  • また、refercensesなどに設定をまとめることで可読性も高めることができる。

メッセージングセマンティクスEffectively-once

本記事

image.png

Apache Kafkaとよく似たApache Pulsarと呼ばれる新しいOSSが登場しました。 本記事はPub/Sub型メッセージ配送方法の信頼性について解説し、Pulsarで登場したセマンティクスEffectively-onceを解説します。

メッセージ配送方式とは

Pub/Sub型メッセージ配送方法には3つの構成要員がいます。 大雑把に解説すると

  • Publisher: データをトピックにpushする人
  • Topic: データをsubscripberに流してあげる人
  • Subsriber: データを受信する人

のようになります。略図は以下のようになります。

image.png (引用:https://cloud.google.com/pubsub/docs/overview?hl=ja)

初心者の方に説明すると「なぜ必要なのか?httpで送れよ」とよく言われます。

ユーザーの追跡している場合など、POSTなどでデータを投げ続けたいときがあります。 そのリクエストを受信・永続化するAPIサーバーが常に受信できるとは限りません。 ユーザが大量にログインしてて、いっぱいいっぱいでレスポンスを返せません。

そのようなときに

  • タイムアウト
  • 永遠に再送する
  • クライアント(ブラウザやアプリは何も起きない)

など直面してはいけない問題ばかりです。 しかし、このPub/Sub型はSubscriberの都合で受信するようになります。

つまり、クライアントからのリクエストはトピック内にpushだけしといて、終わることができます。そのような面でリアルタイムデータ処理などに向いているので多用されています。

データの保証

高速に扱えたりする一方でデータを保証するのが難しいです。 今回のPulsarではEffectively-onceというセマンティクスだと言われていますが、従来では以下のようなセメンティクスがあります。

  • at most once: メッセージは一度しか送信されないが、欠損する可能性がある。
  • at least once: メッセージは確実に届くが、重複する可能性がある。例:fluendGoogle Cloud Pub/Subなど。
  • exactly once: メッセージは欠損も重複もなく届けられる。Apache Flinkなど。

Apache Kafkaは設定により変えることができます。

それぞれについて軽く解説します。

1. at most once

パケットが破棄されたりされても再送しないのがat most onceです。 このようなシステムのときは再送をお願いすることになります。 しかし、これだとまた重複してしまう危険性があり、実用的ではありません。

2. at least once

メッセージの再送を代表に、重複するケースがあります。 しかし、それはシステム上困るケースがあります。たとえば購入などです。

一回の購入で、何回も決済されたらたまったもんじゃありません。 そのためにUUIDをつけるなど重複排除が必要です。

3. exactly once

もっとも理想的であるゆえに、システムは複雑です。 また、どのくらい保証するかという問題はソフトウェアごとに依存するので注意が必要です。

実際にはコーディネータと呼ばれるものでさらにPublisherSubscriberの間に入ります。 その理由は、Topicが仲介人となってPub/Sub型は成り立っているため、PublisherSubscriberは相互の情報を持たないからです。

しかしながら、コーディネータ自身もソフトウェアなので、落ちるケースがあり、やはり現実味がないと言えると思います。

そこで「そもそもexactly once」なのか可能なのか?また、コンセンサス方式、アトミックブロードキャストなど新しい議論が上がっています。

新(?)セメンティクスEffectively-once

定義

We prefer to use the term effectively-once over “exactly-once” because it highlights the fact that a messaging system needs to be able to detect and discard duplicate messages and to do so with a 100% degree of accuracy. We call this feature message deduplication.

日本語訳

私たちはexactly-onceに対してeffectively-onceを使います。それは100%の精度でメッセージは重複を検知し、排除することを強調しています。私たちはメッセージ重複排除と呼ぶ。

詳しく解説してきます。

どういうことなのか?

Effectively Onceでは、メッセージは重複して送られて来ても大丈夫です。これは一般的には問題です。 もっとも重要なのはstate(状態)として一度しか観測されないようになっていることです。

つまり、

exactly-onceはどのようにして重複がなくデータを処理するかに着目していて、`effectively-onceはどのようにして結果として重複がないように処理に取り出せるかに着目しています。

何かデータベースの強整合性結果整合性に若干考え方は似ていますね。

どのようにやっているの?

1. Producer(PulsarのPublisher)の冪等性

再送されたものをシステムは、特定して廃棄しないといけません。再送するケースは分散処理を目的にしているシステムだといろんな理由で容易に起きます。

  • ブローカー(Pub/Subの仲介人)でデータがクラッシュ
  • ネットワークの切断など
  • Publisherがクラッシュしたのきの復旧時

Pulsarのクライアントライブラリでは確かに届けるように、at least onceを保証するように再送します。

v1.20からメッセージ重複排除をサポートしていて、namespace設定から可能にします。

例としては

$ pulsar-admin namespaces set-deduplication $MY_NAMESPACE --enable

です。ここではPulsarのブローカーは重複を排除します。

再度、強調しますがEffectively Once

重複して送信してもいいのです。ただシステムで重複排除をして、取り出すとき重複を無くします。

ブローカーレベルの重複排除の仕組みを説明します。

各ブローカーは各パブリッシャーの最後に成功したpublishされたメーセージのIDを追跡します。また、スナップショットも定期的にとっていて、ブローカーがクラッシュしたときに、クラッシュした前と、クラッシュした後で、一緒になることを保証します。

Pulsarではカーソルを使って保存していて、メモリ上で保存します。 メッセージN個ごとにスナップショットをとって、更新します。 マッピングはproducer -> last sequence IDのようになっていています。

復旧時には最後のスナップショットと同じになるように再構成されます。

2. Producerがクラッシュしたとき

ここまではPulsar型、つまりブローカーが重複を排除する仕組みを解説していました。

ここではProducerがクラッシュしてしまったときを解説します。

Producerがブローカーに接続するとUIDが割り当てられ、データにつきsequence ID0から連続的に決まります。

もし、Producerがクラッシュして、再度トピックに接続するとどのようになるでしょう?

Javaだと以下のコードだとProducerは識別できません。

Producer producer = client.createProducer(TOPIC_NAME, conf);

しかし、連続性を保つ(=Producerを識別する)にはProducerの名前を識別できればよいのです。なので名前をつけます。

ProducerConfiguration conf = new ProducerConfiguration();
conf.setProducerName("my-producer");
conf.setSendTimeout(0, TimeUnit.SECONDS);
Producer producer = client.createProducer(TOPIC_NAME, conf);

また以下のスクリプトで最後のsequece IDに接続することができます。

long lastSequenceId = producer.getLastSequenceId();

ここでは100%近くを保証しますが、ステートレスの代表であるHTTPリクエストのように再送できないものがあります。

Consumerが重複無しにメッセージを受け取る

データはシステム内部のあらゆるレベルで重複排除されます。なので簡単に重複無しに取り出すことができます。

Consumer consumer = client.subscribe(MY_TOPIC, MY_SUBSCRIPTION_NAME);

while (true) {
    Message msg = consumer.receive();
    // Process the message...
    consumer.acknowledge(msg);
}

最終的な重複排除はアプリケーション側で何か対応しないといけませんがat least onceで運用していることがケースで重複排除に関する手法はかなり知られているのでここでは紹介しません。

パフォーマンス

実験結果のデータはなかったですが、Kafkaなどと比べて顕著なオーバーヘッドはないようです。

結論

PulsarではEffectively Onceをジオレプリーケションの場合でも保証します。 メッセージ重複排除は非常に高い保証性があるので、重要なデータを扱う中では強力なものになると思います。

参考文献

Apache Pulsar 2.1.0の大進化

昨日、Apache Pulsar2.1.0がリリースされました。 そこで大きな変化について解説したいと思います。

個人的には2.1.0によって、かなり使い勝手の良いものになったのではないかと思っているので解説していこうと思います。

Agenda

0 Apache Pulsar入門 1 PulsarIO 2 Tiered Storage 3 Stateful Function 4 Go Clientの追加 5 Schemaの追加

0. Apache Pulsar入門

公式サイト

https://pulsar.incubator.apache.org/

インストール方法

スタンドアロンで動かすことができます。

公式でガイドがあるのでご覧ください。

何をしてくれるのか?

Apache Pulsar is an open-source distributed pub-sub messaging system originally created at Yahoo and now part of the Apache Software Foundation

日本語訳

Yahooで主に開発されて、今はApacheSoftwareFoundationの一部であるオープンソースな分散Pub/Subメッセージングシステム

特徴

公式サイトによくまとまっていたので引用させていただきます。

  • 複数クラスタのネイティブサポートとクラスタを横断するメッセージのシームレスなジオレプリケーション
  • メッセージの発行及びエンドツーエンドの通信が低レイテンシ
  • 100万以上のトピックに対するシームレスなスケーラビリティ
  • Java, Python, C++によるシンプルなクライアントAPI
  • トピックに対する複数のサブスクリプションモード (Exclusive, Shared, Failover)
  • Apache BookKeeperを用いた永続ストレージによる配信保証

構成

三つの構成要素があります。

  • Producer
  • Broker
  • Consumer

です。それぞれが何をするのかというと簡単にいうと

  • Producer: トピックにデータを流し込む
  • Broker: トピックを保持し、ProcuderとConsumerの仲介人になる
  • Consumer: トピックをSub(購読)し、データを流し込んでもらう

以下のような図になります。

image.png

ここから本題です

1. PulsarIO

image.png

PulsarIOとはSourceSinkに挟まれているものになります。 何をするのかというと非常に単純です。めちゃめちゃ簡単にSourceからデータを受け取り、Sinkに加工して流し込みます。

たとえば公式には執筆段階では以下のものがあります。

  • Aerospike sink
  • Cassandra sink
  • Kafka source
  • Kafka sink
  • Kinesis sink
  • RabbtMQ source
  • Twitter Firehose source

ここでTwitter Firehose sourceが何をするかと名前にもsourceがあるようにTwitterというソースからデータを流し込んでくれます。

もちろんTwitter APIのtokensecret keyが必要です。 しかし、コードをほとんど書かずに設定することができ、これらのtokenなどはconfig.ymlに記述して読ませるだけで、twitterの情報をストリーミングすることができます。

2. Tiered Storage

Apache PulsarではBookkepperというシステムを使って、Bookieというエントリを保存することでデータを永続し、配信を保証しています。

Pulsarではトピックの中でSegmentと呼ばれる要素に分けます。 そして分散処理のためにコピーを異なるBookieに保存します。

image.png (引用:https://streaml.io/blog/pulsar-segment-based-architecture/)

個人的にはスケールアウトできるものはKubernetes一択と思っていましたが、運用コストを考えるとストレージに以上にコストがかかってしまいます。

そこで例としてAmazon S3Google Cloud Storageなどに保存することが考えられます。 一般的にはコストが小さいのが特徴です。

これからは、Segmentで、別のストレージに保存をわけたりすることができるようになります。

image.png

3. Stateful Function

image.png (引用:https://streaml.io/blog/pulsar-segment-based-architecture/)

システムはConsumerからackを受け取っていないときは、勝手にデータを消さないことを保証していて、この方式は永続メッセージングと言われます。 Pulsarでは全てのメッセージのn個のコピーがRAIDボリュームに永続的に保存され、ここでは状態を記録するような一時的なストレージはサポートしていませんでした。

また、日本語サイトではまだ更新されておらず

現時点ではPulsarは永続的なメッセージストレージのみをサポートしています。これは全てのトピック名の先頭の近い将来、Pulsarは一時的なメッセージストレージをサポートします。

と書かれていますが、新たなStateAPIによってそれが可能となります。 リリースノートではcountというstateで文字数を一時的に状態として保存していていました。

以下の状態で直接クエリを投げてstateを確認できます。

$ bin/pulsar-admin functions querystate \
    --tenant <tenant> \
    --namespace <namespace> \
    --name <function-name> \
    --state-storage-url <bookkeeper-service-url> \
    --key <state-key> \
    [---watch]

4. Go Clientの追加

あまり説明することがないのでGo Clientのドキュメントをご覧ください。

(注意) これでJava, python, c++golangのクライアントが出ましたが、これらにないからといって諦める必要はないので注意してください。(WebsocketなどAPIがあります。)

5. Schemaの追加

「データを流し込む」や「データを渡す」などと簡単にこれまではいっていました。 具体的に何しているのかというとデータをシリアライズしているわけです。

しかし、これまで問題点がありました。それは、、、シリアライズをしたデータを人間が読んでもわからないということであり、これでは運用が難しいものがあります。

Pulsarでは以下のバイナリプロトコルに対応しています。

  • ProtocolBuffer
  • JSON
  • Avro

個人的にはAvroが来てくれて嬉しいのですが、ご利用方法に合わせて選択すれば良いと思います。

まとめ

  • PulsarIOで実験してみるのもよし。
  • Tiered Storageでコスト改善に
  • Statuful FunctionでStateが楽に扱える
  • Go Client追加うれしい
  • Schema追加で、嬉しい人には嬉しい(はず)

モバイルアプリ開発とガチインフラ構築をやってみる

f:id:bobchan1915:20180615141541p:plain

はじめに

完全に日記です

特に技術的に解説してみようと思っていなく、実装の感想などを主に書いています。 簡単なモバイルアプリを作りましたが、アニメーションを除けば基本的な機能は実装してます。

動機

研究室ではスーパコンピューターを使ってのシミュレーションがメインなので、世の中のお金を生むような、いわゆる「サービス」から離れてしまいました。

今も論文のアブストラクトを執筆・教授に添削してもらっている途中です......

感覚を麻痺させないためにも、1日程度まるっと時間を作って単純なサービスを開発してみたという記事です。

インフラ構成

CIはBitriseを使っていますが省略してます。他の本質でないところも省略してます。

f:id:bobchan1915:20180615141541p:plain

コメント

言語をRust, Lisp, Rubyを使った理由

  • Rust: ゲームプラットフォームと名前が一緒だったから。
  • Lisp: パッケージ管理ライブラリがRoswellという名前で、自分が小学生のときにニューメキシコ州に住んでいてロズウェルによく遊びに行っていたから。
  • Ruby: Slackへの通知は単純だし面白みもないので秒速終えたかったから。

総括すると適当です。

インフラに対して

  • GKEを用いた理由は簡単な戦略的なデプロイが可能であり、チームで運用しやすいから。スケール性も考慮。
  • ロギングとしてfluntdを採用した。機械学習のオンライン学習に用いることができるから。自分がオンライン学習用の環境が欲しかったから。
  • MySQLは永続性と使いやすさを考慮して導入。
  • モバイルデータベースrealmはただ使いたかったから。ユーザーの情報を格納。

作成したアプリ

アプリの名前は「Gino」です。 ログインするとGithubリポジトリを検索することができるだけのアプリケーションです。 お気に入りすると検索結果を保存しておくことができます。

一部のUIを載せます。

f:id:bobchan1915:20180615145747p:plain

ただそれだけです。特に目的もないです。

ライブラリの導入を検討しているときに役に経ちました。

最後の画面は特にすることがなく、空白にしてます。 あとウォークスルー型のチュートリアルを作りましたがコンテンツ不足のためデザインが酷くなり載せません。

Firebaseは無料枠が使い切っていたので使いませんでした。

最後に

タイトルではガチってやってみると書きましたが、まだまだやりたいことがたくさんあり、達成度は60%くらいです。 また、デザインや良いUX体験の提供など細かな点は捨ててますが簡単なインフラ・サーバー構築ならば数日あれば、iOSクライアント開発がなければ1日もあれば十分構築できます。

グロースの方がプロダクトローンチよりも難しい一方で、ローンチ時の仕様・技術選定が何よりグロースを握っていると思います。

そのため経験豊富なインフラエンジニアが必要かと言われればそうは思いません。 宣言的継続的デリバリーを採用すれば、チームメンバー全員がインフラエンジニアになれチームの開発速度は爆上がりでしょう。

「ソフトの価値をユーザーに継続的に届ける」ことが継続的デリバリーの真価であり、ビジネス面にも有用であることは間違いありません。

宣言的継続的デリバリーについては以下の記事をご覧ください。

www.1915keke.com