Kekeの日記

エンジニア、読書なんでも

Digdagでワークフローを宣言的に書く!

本記事

私は、いくつか家内サービスを運用しています。 例えばCloud FunctionsとCloud SQLを使った「家計簿」を家族全員が使っていますし、自宅に「監視カメラ」をおいてLineBotで適宜確認できるようになっています。

無料枠を使い回しているのでデータ移行が定期的にありますが、そのときによくEmbulkを使っています。

非常に短い備忘録ですが、以下にまとめています。

www.1915keke.com

この記事は短すぎて適当に見えるのですが、それくらいシステムは簡単だったことが印象でした。

今回はEmbulkと同じくTeasureDataで開発が進められているDigdagを使ってみて、どのような開発のしやすさなのかなど知識を増やしたいと思います。

Digdagについて

一体何をするもの?

https://camo.qiitausercontent.com/9f383de06b4507b7f860521f18dea05734086a69/68747470733a2f2f7777772e6469676461672e696f2f77702d636f6e74656e742f75706c6f6164732f323031362f31312f4469674461675f686f72697a6f6e74616c2e706e67

Digdagとは、トレジャーデータが公開したオープンソースなワークフローマネージャーです

設定ファイルは一般的なYAMLではなくて.digであり、どうやらMapのキーの順番が維持される必要があるかららしいです。

目的としては

  • cron
  • ITオペレーションの自動化
  • バッチジョブによるデータ分析

などを置き換えるために作られています。手作業で行っていた作業を自動化することによって、様々な恩恵が受けられると思います。

特徴

1. 簡単に開発できる

バイナリーをダウンロードすればすぐに開発できます。

周知の如くですが、やはり開発しやすさと開発スピードには相関があり、それはビジネスを進める上でも非常に重要です。

2. 簡単な設定

YAMLによって書かれているため非常に分かりやすいです。

YAMLは広く使われていてCircleCIやKubernetesをはじめとするありとあらゆるものに使われています。

3. 依存関係の解決

依存関係がある読み書きできるワークフローになっています。CrobJobのように時刻で動作するようなものから解放されます。

4. マルチクラウド

Amazon Redshift/S3をはじめBigQuery/ Could Storageやオンプレミスなデータベースからデータを扱えます。

5. 多言語サポート

現在はPythonとRubyをネイティブサポートしています。

データサイエンティストとWeb開発者が協力することができます。

6. エラーハンドリング

ワークフローが失敗してしまったとき、Digdagはロバストなエラーハンドリングを提供します。

7. モジュール性

それぞれのワークフローが再利用可能なものになるようにDigdagはデザインされています。

8. 拡張可能

Pluginによって拡張できるようになっています。自分のオペレータやコントロールフローを定義できる。

9. 管理者UI

GUIによって管理ができるようになっているので、すべてをコマンドラインからする必要がないです。

10. Secretsを管理

DigdagはSecretを安全に管理できるようにできる。

11. Dockerをサポート

DigdagはDockerコンテナに定義されてあるDockerコンテナをサポートしているので、環境に依存せずにタスクをこなすことができます。

12. バージョン管理

Digdagはワークフローをコードとして宣言的に書くことができます。

アーキテクチャ

ワークフローによってありとあらゆるタスクを自動化できます。

タスクはオペレータープラグインを使って定義されます。使えるオペレーター一覧は以下の通りです。

Operators — Digdag 0.9.5 documentation

これからアーキテクチャについて説明していきます。

1. グループによるタスク管理

[http://docs.digdag.io/images/grouping-tasks.png:image=http://docs.digdag.io/images/grouping-tasks.png]

Digdagでは複雑なワークフローを自動化できる一方で、管理をしなければなりません。

しかし、グループという概念でタスクをまとめることができ、直感的にワークフローを理解することが容易になります。

親子の関係であり、グループが実行されるとそのグループに含まれる子であるタスクがすべて実行されます。タスクが成功するとグループが成功したとされます。

2. パラメータ

グループにタスクをまとめることは、タスク間でパラメータを渡すために使われています。

親のタスクはexportすることによって変数をこのタスクに渡すことができます。

3. ワークフロー as Code

「宣言的に書ける」よとうことです。

例えばGithub上にあるワークフローは実際のデプロイされているワークフローをそのまま表していることになります。

逆も然りです。実際のワークフローはGithub上のコードを記述しているはずです。

4. Docker上でタスクを実行できる

Dockerの上でタスクを実行するので実行環境の差分によるエラーなどを防ぐことができます。

例えばDockerを使う場合は以下のように書きます。

_export:
  docker:
    image: ubuntu:latest
    pull_always: true

+step1:
  py>: tasks.MyWorkflow.step1

用語

Task

あまり説明が必要ではないかもしれませんが、アトミックな実行単位です。名前は一意になるようにしなければなりません。

実際に何をするかは定義次第です。

一例としてあげます。

# my_workflow.dig
+load:
  +from_mysql:
    +tables:
      ...
  +from_postgres:
    ...
+dump:
  ...

このようにすると特定のタスクは親のタスクがプレフィックスについた名前で指定されます。例えば+my_workflow+load+from_mysql+tables,+my_workflow+load+from_postgresのようになるということです。

ワークフローを作ってみよう

DigdagをInstallする

まず、以下のコマンドを使用してインストールします。

$ curl -o ~/bin/digdag --create-dirs -L "https://dl.digdag.io/digdag-latest"
$ chmod +x ~/bin/digdag
$ echo 'export PATH="$HOME/bin:$PATH"' >> ~/.bashrc

ここで確認のために

digdag --version

と実行してみてください。

私の場合はFishシェルで実行しているためエラーが出ました。

Failed to execute process '/Users/keisukeyamashita/bin/digdag'. Reason:
exec: Exec format error
The file '/Users/keisukeyamashita/bin/digdag' is marked as an executable but could not be run by the operating system.

これはIssueにもあがっています。

github.com

以下のaliasを登録します。

alias digdag="java -jar ~/bin/digdag"

すると使えるようになると思います。

digdag --version

0.9.31

これで始めることができます。

簡単なワークフローを作ってみる

まず、これまでのディレクトリは以下の通りになっています。

.
└── README.md

そして以下のようにdigdag initコマンドで初期化します。

digdag init hello-world-workflow

すると以下のようなディレクトリ構成になっています。

.
├── README.md
└── hello-world-workflow
    └── hello-world-workflow.dig

また、実際の中身の.digファイルは以下のようになっています。

timezone: UTC

+setup:
  echo>: start ${session_time}

+disp_current_date:
  echo>: ${moment(session_time).utc().format('YYYY-MM-DD HH:mm:ss Z')}

+repeat:
  for_each>:
    order: [first, second, third]
    animal: [dog, cat]
  _do:
    echo>: ${order} ${animal}
  _parallel: true

+teardown:
  echo>: finish ${session_time}

これを実行するにはdigdag runコマンドを実行せればよいです。

digdag run hello-world-workflow.dig 

すると以下のようになります。

2018-12-27 22:34:53 +0900: Digdag v0.9.31
2018-12-27 22:34:54 +0900 [WARN] (main): Using a new session time 2018-12-27T00:00:00+00:00.
2018-12-27 22:34:54 +0900 [INFO] (main): Using session /Users/keisukeyamashita/Dropbox/go/src/github.com/KeisukeYamashita/test-digdag/hello-world-workflow/.digdag/status/20181227T000000+0000.
2018-12-27 22:34:54 +0900 [INFO] (main): Starting a new session project id=1 workflow name=hello-world-workflow session_time=2018-12-27T00:00:00+00:00
2018-12-27 22:34:55 +0900 [INFO] (0018@[0:default]+hello-world-workflow+setup): echo>: start 2018-12-27T00:00:00+00:00
start 2018-12-27T00:00:00+00:00
2018-12-27 22:34:56 +0900 [INFO] (0018@[0:default]+hello-world-workflow+disp_current_date): echo>: 2018-12-27 00:00:00 +00:00
2018-12-27 00:00:00 +00:00
2018-12-27 22:34:56 +0900 [INFO] (0018@[0:default]+hello-world-workflow+repeat): for_each>: {order=[first, second, third], animal=[dog, cat]}
2018-12-27 22:34:56 +0900 [INFO] (0018@[0:default]+hello-world-workflow+repeat^sub+for-0=order=0=first&1=animal=0=dog): echo>: first dog
first dog
2018-12-27 22:34:56 +0900 [INFO] (0020@[0:default]+hello-world-workflow+repeat^sub+for-0=order=1=second&1=animal=0=dog): echo>: second dog
second dog
2018-12-27 22:34:56 +0900 [INFO] (0022@[0:default]+hello-world-workflow+repeat^sub+for-0=order=2=third&1=animal=0=dog): echo>: third dog
third dog
2018-12-27 22:34:56 +0900 [INFO] (0023@[0:default]+hello-world-workflow+repeat^sub+for-0=order=2=third&1=animal=1=cat): echo>: third cat
third cat
2018-12-27 22:34:56 +0900 [INFO] (0021@[0:default]+hello-world-workflow+repeat^sub+for-0=order=1=second&1=animal=1=cat): echo>: second cat
second cat
2018-12-27 22:34:56 +0900 [INFO] (0019@[0:default]+hello-world-workflow+repeat^sub+for-0=order=0=first&1=animal=1=cat): echo>: first cat
first cat
2018-12-27 22:34:57 +0900 [INFO] (0019@[0:default]+hello-world-workflow+teardown): echo>: finish 2018-12-27T00:00:00+00:00
finish 2018-12-27T00:00:00+00:00
Success. Task state is saved at /Users/keisukeyamashita/Dropbox/go/src/github.com/KeisukeYamashita/test-digdag/hello-world-workflow/.digdag/status/20181227T000000+0000 directory.
  * Use --session <daily | hourly | "yyyy-MM-dd[ HH:mm:ss]"> to not reuse the last session time.
  * Use --rerun, --start +NAME, or --goal +NAME argument to rerun skipped tasks.

Workflowの体系的な話

1. Workflowの定義

Workflowは.digのように指定されます。そしてファイル名があなたのワークフロー名になります。

トップレベルにtimezoneでどの時間帯かを指定します。デフォルトではUTCですが、Asia/Tokyoなどを指定することができます。

最初にdigdag initで初期化した時にできたワークフローの一行目です。

timezone: UTC

2. タスクの定義

タスクは+を使って定義し、上から下に実行されます。

3. オペレータを使う

タスクはオペレーターのタイプ名>のように書かれています。シェルスクリプトを実行したり、Pythonスクリプトを実行したり、メールを送ったり、いろんなことができます。

4. 変数を使う

すでにEmbedしている変数があります。

例えば以下のようなものです。

  • timezone
  • session_id
  • task_name

などです。詳しくは以下のリンクをご覧ください。

Workflow definition — Digdag 0.9.5 documentation

5. 計算をする

${}の中で基本的なJavascriptの計算はできます。また、時間の計算にはMoment.jsが使われています。

例えば一つのタスク内で以下のような計算ができます。

echo>: ${moment(session_time).format("YYYY-MM-DD HH:mm:ss Z")}

6. 変数を定義する

自分で変数を定義したいときがあるとしましょう。

三つの方法で変数を定義できます。

  • _exportで定義する
  • APIを使って定義する
  • Sessionを開始するときに定義する

Exportで定義するときは以下のようにします。

_export:
  foo: 1

+prepare:
  py>: tasks.MyWorkflow.prepare

APIを使う場合は以下の通りです。

class MyWorkflow(object):
  def prepare(self):
    digdag.env.store({"my_param": 2})

このようにdigdag.env.store(dict)で引数に辞書型を渡してあげます。

7. 他のdigファイルをimportする

以下のように他のファイルを!includeすることによってimportすることができます。

_export:
  mysql:
    !include : 'config/mysql.dig'
  hive:
    !include : 'config/hive.dig'

!include : 'tasks/foo.dig'

8. 並列処理

並列に処理をすることができます。方法としては、タスクに対して_parallel: trueとすることによってグループのタスクを並列処理できます。

例えば、以下のように定義します。

+prepare:
  _parallel: true

  +data1:
    sh>: tasks/prepare_data1.sh

  +data2:
    sh>: tasks/prepare_data2.sh

  +data3:
    sh>: tasks/prepare_data3.sh

+analyze:
    sh>: tasks/analyze_prepared_data_sets.sh

特定のタスクのみを並列処理したければ、_backgroud:trueと追記することによって定義することができます。

+prepare:
  +data1:
    sh>: tasks/prepare_data1.sh

  # +data1 and +data2 run in parallel.
  +data2:
    _background: true
    sh>: tasks/prepare_data2.sh

  +data3:
    sh>: tasks/prepare_data3.sh

9. 自動的に失敗したタスクのリトライ

_retry:Nというリトライパラメータが設定できます。このNというのは`1, 2, 3 ...です。

失敗してリトライできる回数を設定できます。

+prepare:
  # If +erase_table, +load_data, or +check_loaded_data fail, it retries from
  # +erase_table again.
  _retry: 3

  +erase_table:
    sh>: tasks/erase_table.sh

  +load_data:
    sh>: tasks/load_data.sh

  +check_loaded_data:
    sh>: tasks/check_loaded_data.sh

+analyze:
    sh>: tasks/analyze_prepared_data_sets.sh

公式ドキュメントにもあるように、失敗するとそのグループの最初のタスクから実行されます。

特定のタスクに_retry: Nを定義することもできます。

10. Error 通知

失敗すると以下のように_errorで定義したタスクを走られすことができます。

# this task runs when a workflow fails.
_error:
  sh>: tasks/runs_when_workflow_failed.sh

+analyze:
    sh>: tasks/analyze_prepared_data_sets.sh

11. スケジューリング

11.1 文法

ワークフローを定期的に行うには以下のようにschedule:で定義する必要があります。

timezone: UTC

schedule:
  daily>: 07:00:00

+step1:
  sh>: tasks/shell_sample.sh

どのように定義すうかは公式ドキュメントを参考にしてください。

もちろんcronのフォーマットでも指定をすることができます。

11.2 スケジューラーを実行する

digdag schedulerでコマンドを実行できます。

ファイルが変更していても、自動的にロードされます。

11.2 実行にかかった時間をアラートする

あるワークフローに、普段なら2分くらいしかかからないのに1時間などかかっていたら何かがおかしいことが想定できます。

そのようなために、timeとうものが指定できます。

timezone: UTC

schedule:
  daily>: 07:00:00

sla:
  # triggers this task at 02:00
  time: 02:00
  +notice:
    sh>: notice.sh

+long_running_job:
  sh>: long_running_job.sh
11.3 セッションのスキップ

例えば30分毎に実行されているワークフローがあり、普段は20分かかって終了するが、データ数が以上に多い祝日などは20分以上かかってしまうかもしれません。

そのようなときにskip_on_overtime: true | falseを設定することによって、複数のWorkflowが同時に実行されるのかを指定できます。

DigdagのGUIを使ってみる

Digdagには開発時につかうローカルモードと、運用時に使うサーバーモードがあります。

ローカルモードでは単一の.digで定義されたワークフローを実行できます。

f:id:bobchan1915:20181228044736p:plain

[分散ワークフローエンジン『Digdag』の実装 at Tokyo RubyKaigi #11]

またサーバーモードでは以下のようなアーキテクチャになっています。

f:id:bobchan1915:20181228044830p:plain

[分散ワークフローエンジン『Digdag』の実装 at Tokyo RubyKaigi #11]

まずは、実際にGUIを提供するサーバーを立ててみます。

digdag server --memory

そして、以下のようにワークフローをAPIを通して保存します。

digdag push hello-world-workflow

他のディレクトリにいるなら---projectで指定してください。

そしてhttp://localhost:65432にアクセスをします。

すると以下のようにワークフローがpushされています。

f:id:bobchan1915:20181228045031p:plain

そして選択してみると以下のようになっています。

f:id:bobchan1915:20181228045212p:plain

実際に実行しています。

するとセッションができます。

f:id:bobchan1915:20181228045437p:plain

StatusのSuccessボタンを指定すると、結果をみることができます。

f:id:bobchan1915:20181228045523p:plain

特にforeachなどが並列処理されていることがわかります。

ログレベルはPushのときに指定できます。

例えば

--log-level=info

といったような感じです。必要に応じて行ってください。

参考文献

www.slideshare.net