本記事
私は、いくつか家内サービスを運用しています。 例えばCloud FunctionsとCloud SQLを使った「家計簿」を家族全員が使っていますし、自宅に「監視カメラ」をおいてLineBotで適宜確認できるようになっています。
無料枠を使い回しているのでデータ移行が定期的にありますが、そのときによくEmbulkを使っています。
非常に短い備忘録ですが、以下にまとめています。
この記事は短すぎて適当に見えるのですが、それくらいシステムは簡単だったことが印象でした。
今回はEmbulkと同じくTeasureDataで開発が進められているDigdagを使ってみて、どのような開発のしやすさなのかなど知識を増やしたいと思います。
Digdagについて
一体何をするもの?
Digdagとは、トレジャーデータが公開したオープンソースなワークフローマネージャーです
設定ファイルは一般的なYAMLではなくて.dig
であり、どうやらMapのキーの順番が維持される必要があるかららしいです。
目的としては
- cron
- ITオペレーションの自動化
- バッチジョブによるデータ分析
などを置き換えるために作られています。手作業で行っていた作業を自動化することによって、様々な恩恵が受けられると思います。
特徴
1. 簡単に開発できる
バイナリーをダウンロードすればすぐに開発できます。
周知の如くですが、やはり開発しやすさと開発スピードには相関があり、それはビジネスを進める上でも非常に重要です。
2. 簡単な設定
YAMLによって書かれているため非常に分かりやすいです。
YAMLは広く使われていてCircleCIやKubernetesをはじめとするありとあらゆるものに使われています。
3. 依存関係の解決
依存関係がある読み書きできるワークフローになっています。CrobJobのように時刻で動作するようなものから解放されます。
4. マルチクラウド
Amazon Redshift/S3をはじめBigQuery/ Could Storageやオンプレミスなデータベースからデータを扱えます。
5. 多言語サポート
現在はPythonとRubyをネイティブサポートしています。
データサイエンティストとWeb開発者が協力することができます。
6. エラーハンドリング
ワークフローが失敗してしまったとき、Digdagはロバストなエラーハンドリングを提供します。
7. モジュール性
それぞれのワークフローが再利用可能なものになるようにDigdagはデザインされています。
8. 拡張可能
Pluginによって拡張できるようになっています。自分のオペレータやコントロールフローを定義できる。
9. 管理者UI
GUIによって管理ができるようになっているので、すべてをコマンドラインからする必要がないです。
10. Secretsを管理
DigdagはSecretを安全に管理できるようにできる。
11. Dockerをサポート
DigdagはDockerコンテナに定義されてあるDockerコンテナをサポートしているので、環境に依存せずにタスクをこなすことができます。
12. バージョン管理
Digdagはワークフローをコードとして宣言的に書くことができます。
アーキテクチャ
ワークフローによってありとあらゆるタスクを自動化できます。
タスクはオペレータープラグインを使って定義されます。使えるオペレーター一覧は以下の通りです。
Operators — Digdag 0.9.33 documentation
これからアーキテクチャについて説明していきます。
1. グループによるタスク管理
[http://docs.digdag.io/images/grouping-tasks.png:image=http://docs.digdag.io/images/grouping-tasks.png]
Digdagでは複雑なワークフローを自動化できる一方で、管理をしなければなりません。
しかし、グループという概念でタスクをまとめることができ、直感的にワークフローを理解することが容易になります。
親子の関係であり、グループが実行されるとそのグループに含まれる子であるタスクがすべて実行されます。タスクが成功するとグループが成功したとされます。
2. パラメータ
グループにタスクをまとめることは、タスク間でパラメータを渡すために使われています。
親のタスクはexport
することによって変数をこのタスクに渡すことができます。
3. ワークフロー as Code
「宣言的に書ける」よとうことです。
例えばGithub上にあるワークフローは実際のデプロイされているワークフローをそのまま表していることになります。
逆も然りです。実際のワークフローはGithub上のコードを記述しているはずです。
4. Docker上でタスクを実行できる
Dockerの上でタスクを実行するので実行環境の差分によるエラーなどを防ぐことができます。
例えばDockerを使う場合は以下のように書きます。
_export: docker: image: ubuntu:latest pull_always: true +step1: py>: tasks.MyWorkflow.step1
用語
Task
あまり説明が必要ではないかもしれませんが、アトミックな実行単位です。名前は一意になるようにしなければなりません。
実際に何をするかは定義次第です。
一例としてあげます。
# my_workflow.dig +load: +from_mysql: +tables: ... +from_postgres: ... +dump: ...
このようにすると特定のタスクは親のタスクがプレフィックスについた名前で指定されます。例えば+my_workflow+load+from_mysql+tables
,+my_workflow+load+from_postgres
のようになるということです。
ワークフローを作ってみよう
DigdagをInstallする
まず、以下のコマンドを使用してインストールします。
$ curl -o ~/bin/digdag --create-dirs -L "https://dl.digdag.io/digdag-latest" $ chmod +x ~/bin/digdag $ echo 'export PATH="$HOME/bin:$PATH"' >> ~/.bashrc
ここで確認のために
digdag --version
と実行してみてください。
私の場合はFishシェルで実行しているためエラーが出ました。
Failed to execute process '/Users/keisukeyamashita/bin/digdag'. Reason: exec: Exec format error The file '/Users/keisukeyamashita/bin/digdag' is marked as an executable but could not be run by the operating system.
これはIssueにもあがっています。
以下のaliasを登録します。
alias digdag="java -jar ~/bin/digdag"
すると使えるようになると思います。
digdag --version 0.9.31
これで始めることができます。
簡単なワークフローを作ってみる
まず、これまでのディレクトリは以下の通りになっています。
. └── README.md
そして以下のようにdigdag init
コマンドで初期化します。
digdag init hello-world-workflow
すると以下のようなディレクトリ構成になっています。
. ├── README.md └── hello-world-workflow └── hello-world-workflow.dig
また、実際の中身の.dig
ファイルは以下のようになっています。
timezone: UTC +setup: echo>: start ${session_time} +disp_current_date: echo>: ${moment(session_time).utc().format('YYYY-MM-DD HH:mm:ss Z')} +repeat: for_each>: order: [first, second, third] animal: [dog, cat] _do: echo>: ${order} ${animal} _parallel: true +teardown: echo>: finish ${session_time}
これを実行するにはdigdag run
コマンドを実行せればよいです。
digdag run hello-world-workflow.dig
すると以下のようになります。
2018-12-27 22:34:53 +0900: Digdag v0.9.31 2018-12-27 22:34:54 +0900 [WARN] (main): Using a new session time 2018-12-27T00:00:00+00:00. 2018-12-27 22:34:54 +0900 [INFO] (main): Using session /Users/keisukeyamashita/Dropbox/go/src/github.com/KeisukeYamashita/test-digdag/hello-world-workflow/.digdag/status/20181227T000000+0000. 2018-12-27 22:34:54 +0900 [INFO] (main): Starting a new session project id=1 workflow name=hello-world-workflow session_time=2018-12-27T00:00:00+00:00 2018-12-27 22:34:55 +0900 [INFO] (0018@[0:default]+hello-world-workflow+setup): echo>: start 2018-12-27T00:00:00+00:00 start 2018-12-27T00:00:00+00:00 2018-12-27 22:34:56 +0900 [INFO] (0018@[0:default]+hello-world-workflow+disp_current_date): echo>: 2018-12-27 00:00:00 +00:00 2018-12-27 00:00:00 +00:00 2018-12-27 22:34:56 +0900 [INFO] (0018@[0:default]+hello-world-workflow+repeat): for_each>: {order=[first, second, third], animal=[dog, cat]} 2018-12-27 22:34:56 +0900 [INFO] (0018@[0:default]+hello-world-workflow+repeat^sub+for-0=order=0=first&1=animal=0=dog): echo>: first dog first dog 2018-12-27 22:34:56 +0900 [INFO] (0020@[0:default]+hello-world-workflow+repeat^sub+for-0=order=1=second&1=animal=0=dog): echo>: second dog second dog 2018-12-27 22:34:56 +0900 [INFO] (0022@[0:default]+hello-world-workflow+repeat^sub+for-0=order=2=third&1=animal=0=dog): echo>: third dog third dog 2018-12-27 22:34:56 +0900 [INFO] (0023@[0:default]+hello-world-workflow+repeat^sub+for-0=order=2=third&1=animal=1=cat): echo>: third cat third cat 2018-12-27 22:34:56 +0900 [INFO] (0021@[0:default]+hello-world-workflow+repeat^sub+for-0=order=1=second&1=animal=1=cat): echo>: second cat second cat 2018-12-27 22:34:56 +0900 [INFO] (0019@[0:default]+hello-world-workflow+repeat^sub+for-0=order=0=first&1=animal=1=cat): echo>: first cat first cat 2018-12-27 22:34:57 +0900 [INFO] (0019@[0:default]+hello-world-workflow+teardown): echo>: finish 2018-12-27T00:00:00+00:00 finish 2018-12-27T00:00:00+00:00 Success. Task state is saved at /Users/keisukeyamashita/Dropbox/go/src/github.com/KeisukeYamashita/test-digdag/hello-world-workflow/.digdag/status/20181227T000000+0000 directory. * Use --session <daily | hourly | "yyyy-MM-dd[ HH:mm:ss]"> to not reuse the last session time. * Use --rerun, --start +NAME, or --goal +NAME argument to rerun skipped tasks.
Workflowの体系的な話
1. Workflowの定義
Workflowは.dig
のように指定されます。そしてファイル名があなたのワークフロー名になります。
トップレベルにtimezone
でどの時間帯かを指定します。デフォルトではUTC
ですが、Asia/Tokyo
などを指定することができます。
最初にdigdag init
で初期化した時にできたワークフローの一行目です。
timezone: UTC
2. タスクの定義
タスクは+
を使って定義し、上から下に実行されます。
3. オペレータを使う
タスクはオペレーターのタイプ名>
のように書かれています。シェルスクリプトを実行したり、Pythonスクリプトを実行したり、メールを送ったり、いろんなことができます。
4. 変数を使う
すでにEmbedしている変数があります。
例えば以下のようなものです。
timezone
session_id
task_name
などです。詳しくは以下のリンクをご覧ください。
Workflow definition — Digdag 0.9.33 documentation
5. 計算をする
${}
の中で基本的なJavascriptの計算はできます。また、時間の計算にはMoment.jsが使われています。
例えば一つのタスク内で以下のような計算ができます。
echo>: ${moment(session_time).format("YYYY-MM-DD HH:mm:ss Z")}
6. 変数を定義する
自分で変数を定義したいときがあるとしましょう。
三つの方法で変数を定義できます。
_export
で定義する- APIを使って定義する
- Sessionを開始するときに定義する
Exportで定義するときは以下のようにします。
_export: foo: 1 +prepare: py>: tasks.MyWorkflow.prepare
APIを使う場合は以下の通りです。
class MyWorkflow(object): def prepare(self): digdag.env.store({"my_param": 2})
このようにdigdag.env.store(dict)
で引数に辞書型を渡してあげます。
7. 他のdig
ファイルをimportする
以下のように他のファイルを!include
することによってimportすることができます。
_export: mysql: !include : 'config/mysql.dig' hive: !include : 'config/hive.dig' !include : 'tasks/foo.dig'
8. 並列処理
並列に処理をすることができます。方法としては、タスクに対して_parallel: true
とすることによってグループのタスクを並列処理できます。
例えば、以下のように定義します。
+prepare: _parallel: true +data1: sh>: tasks/prepare_data1.sh +data2: sh>: tasks/prepare_data2.sh +data3: sh>: tasks/prepare_data3.sh +analyze: sh>: tasks/analyze_prepared_data_sets.sh
特定のタスクのみを並列処理したければ、_backgroud:true
と追記することによって定義することができます。
+prepare: +data1: sh>: tasks/prepare_data1.sh # +data1 and +data2 run in parallel. +data2: _background: true sh>: tasks/prepare_data2.sh +data3: sh>: tasks/prepare_data3.sh
9. 自動的に失敗したタスクのリトライ
_retry:N
というリトライパラメータが設定できます。このN
というのは`1, 2, 3 ...
です。
失敗してリトライできる回数を設定できます。
+prepare: # If +erase_table, +load_data, or +check_loaded_data fail, it retries from # +erase_table again. _retry: 3 +erase_table: sh>: tasks/erase_table.sh +load_data: sh>: tasks/load_data.sh +check_loaded_data: sh>: tasks/check_loaded_data.sh +analyze: sh>: tasks/analyze_prepared_data_sets.sh
公式ドキュメントにもあるように、失敗するとそのグループの最初のタスクから実行されます。
特定のタスクに_retry: N
を定義することもできます。
10. Error 通知
失敗すると以下のように_error
で定義したタスクを走られすことができます。
# this task runs when a workflow fails. _error: sh>: tasks/runs_when_workflow_failed.sh +analyze: sh>: tasks/analyze_prepared_data_sets.sh
11. スケジューリング
11.1 文法
ワークフローを定期的に行うには以下のようにschedule:
で定義する必要があります。
timezone: UTC schedule: daily>: 07:00:00 +step1: sh>: tasks/shell_sample.sh
どのように定義すうかは公式ドキュメントを参考にしてください。
もちろんcron
のフォーマットでも指定をすることができます。
11.2 スケジューラーを実行する
digdag scheduler
でコマンドを実行できます。
ファイルが変更していても、自動的にロードされます。
11.2 実行にかかった時間をアラートする
あるワークフローに、普段なら2分くらいしかかからないのに1時間などかかっていたら何かがおかしいことが想定できます。
そのようなために、time
とうものが指定できます。
timezone: UTC schedule: daily>: 07:00:00 sla: # triggers this task at 02:00 time: 02:00 +notice: sh>: notice.sh +long_running_job: sh>: long_running_job.sh
11.3 セッションのスキップ
例えば30分毎に実行されているワークフローがあり、普段は20分かかって終了するが、データ数が以上に多い祝日などは20分以上かかってしまうかもしれません。
そのようなときにskip_on_overtime: true | false
を設定することによって、複数のWorkflowが同時に実行されるのかを指定できます。
DigdagのGUIを使ってみる
Digdagには開発時につかうローカルモードと、運用時に使うサーバーモードがあります。
ローカルモードでは単一の.dig
で定義されたワークフローを実行できます。
[分散ワークフローエンジン『Digdag』の実装 at Tokyo RubyKaigi #11]
またサーバーモードでは以下のようなアーキテクチャになっています。
[分散ワークフローエンジン『Digdag』の実装 at Tokyo RubyKaigi #11]
まずは、実際にGUIを提供するサーバーを立ててみます。
digdag server --memory
そして、以下のようにワークフローをAPIを通して保存します。
digdag push hello-world-workflow
他のディレクトリにいるなら---project
で指定してください。
そしてhttp://localhost:65432
にアクセスをします。
すると以下のようにワークフローがpush
されています。
そして選択してみると以下のようになっています。
実際に実行しています。
するとセッションができます。
StatusのSuccessボタンを指定すると、結果をみることができます。
特にforeach
などが並列処理されていることがわかります。
ログレベルはPushのときに指定できます。
例えば
--log-level=info
といったような感じです。必要に応じて行ってください。
参考文献
www.slideshare.net