DAG (Directed Acyclic Graph)

DAG(Directed Acyclic Graph)は、有向非巡回グラフ型のワークフローモデルです。

DAGは、処理タスク間の依存関係を定義したワークフローで以下のようなイメージになります。この例では、DAGフローが開始されるとtask-aとtask-bが並列実行され、双方が完了するとtask-cが実行されます。task-cが完了するとtask-dとtask-eが並列実行され、task-dが完了するとtask-fを実行して終了します。


DAG管理モデル

DAGプラグインは、DAGオブジェクトとDAGTaskオブジェクトで構成されます。DAGはワークフローで処理するタスクをDAGTaskとしてグラフ構造で管理します。DAGTaskは処理実装への参照(Operator)を保持しています。 タスクの処理実装への参照であるOperatorは、4種類存在します。

  1. FunctionPluginOperator: 特定のFunctionプラグインへの参照

  2. ModulePluginOperator: 特定のModuleプラグインに含まれる特定の関数への参照

  3. ThirdPartyPluginOperator: 特定の持ち込みpythonパッケージ内の特定モジュールに含まれる特定の関数への参照

  4. DagPluginOperator: 特定のDAGプラグインへの参照


DAGオブジェクトは定義オブジェクトであることから状態を保持しません。DAGを実行し、ワークフローを稼働させるにはDAGInstanceオブジェクトを生成します。これはDAGオブジェクトの定義を引き継ぎ、配下に各DAGTaskの実行状態を管理するDAGTaskInstanceを生成します。DAGTaskInstanceはOperatorを通じてタスク処理の実行管理を行います。

従ってDAGInstance生成後にOperatorが参照している処理実装を変更した場合は、最新の処理内容で実行される点に注意が必要です。
管理モデルの概念図を以下に記載しています。

タスクに設定されたOperatorが例外を発出し、失敗した場合にロールバック処理を実行させることができます。ロールバック処理はDAGTaskにロールバックのためのOperatorを指定することで任意の処理を実装できます。


DAGプラグインが保有する属性

属性 概要 備考
workspace ワークスペースを指定します。 -
name 名前を指定します。 ユニークである必要があります。
category カテゴリを指定します。 -
dag 構成要素となるタスクの構造定義です。 -
dag.identifier DAGタスクの識別子を指定します。 ^[a-zA-Z][a-zA-Z_0-9-]{1,32}$の正規表現に合致する文字列のみ許容します。また同一DAG内でユニークである必要があります。
dag.identifier.dependsOn DAGタスクの依存先タスクの識別子のリストを指定します。 -
dag.identifier.operator DAGタスクで実行する処理を提供するオペレータを定義します。 -
dag.identifier.operator.operatorType DAGタスクで実行する処理を提供するオペレータの種別を指定します。 選択可能な種別は、FunctionPluginOperatorModulePluginOperatorThirdPartyPluginOperatorDagPluginOperatorのいずれかです。
dag.identifier.operator.package dag.identifier.operator.operatorTypeにてThirdPartyPluginOperatorを選択した場合に、持ち込みpythonライブラリのpackage名を指定します。 -
dag.identifier.operator.modulename dag.identifier.operator.operatorTypeにてThirdPartyPluginOperatorModulePluginOperatorを選択した場合に、持ち込みpythonライブラリ内のモジュール名やModuleプラグイン名を指定します。 -
dag.identifier.operator.funcname DAGフローがdownstream方向に稼働している場合に当該タスクが実行する関数を指定します。 FunctionPluginOperatorの場合は、Functionプラグインの名前を指定します。
ModulePluginOperatorの場合は、Moduleプラグイン内の関数名を指定します。
ThirdPartyPluginOperatorの場合は、持ち込みpythonライブラリ内の関数名を指定します。
dag.identifier.operator.reverseFuncname DAGフローがupstream方向に稼働している場合に当該タスクが実行する関数を指定します。 FunctionPluginOperatorの場合は、Functionプラグインの名前を指定します。ModulePluginOperatorの場合は、Moduleプラグイン内の関数名を指定します。ThirdPartyPluginOperatorの場合は、持ち込みpythonライブラリ内の関数名を指定します。
dag.identifier.operator.dagName dag.identifier.operator.operatorTypeにてDagPluginOperatorを選択した場合に、実行対象となるDAG名を指定します。 本タスクの実行時に新たなDAGインスタンスが生成され、本タスクによって実行管理されます。起動元タスクの実行時点のmetadataが起動先DAGインスタンスに渡されます。起動先DAGインスタンスがmetadataを書き換えても起動元DAGインスタンスには影響しません。
dag.identifier.operator.synchronized 本タスクが実行するDAGインスタンスを同期実行するか、非同期実行するかのフラグを指定します。
デフォルトはTrueです。
Trueの場合、生成/実行されたDAGインスタンスがDone状態となるまで該当タスクは完了しません。
尚、後述するDAGのRewind機能によって該当タスクが巻き戻される場合、生成されたDAGインスタンスはリバース実行されます。巻き戻し後に再実行されると、新たなDAGインスタンスの生成は行わず、既に生成済みのDAGインスタンスが再利用されます。
起動元、起動先のDAGインスタンスは主従関係を持ち、起動元によって起動先のインスタンスは管理されます。また、起動先のDAGインスタンスを直接削除することも許可されません。起動元インスタンスの削除と同時に削除されます。

Falseの場合、生成/実行されたDAGインスタンスの実行状態とは無関係に該当タスクは完了します。
ワークフローの途中で新たなワークフローをフォークするだけの動作となり、DAGインスタンス同士は互いに干渉しません。
dag.identifier.label 当該タスクの注釈や処理概要を記載できるユーザラベルです。 DAGワークフローの挙動には無関係です。
dag.identifier.approvalRequired 当該タスクの実行に際してユーザからの承認行為が必要か否かを指定するフラグです。
デフォルトはFalseです。
-
dag.identifier.approvalSchema 承認待ちタスクの承認時に承認者からの入力を受け付ける場合の入力スキーマを定義できます。 入力されたデータは、DAGインスタンスのmetadataにマージされ、タスク処理の中で参照することができます。
dag.identifier.skipSchema 承認待ちタスクをスキップする際に承認者からの入力を受け付ける場合の入力スキーマを定義できます。 入力されたデータは、DAGインスタンスのmetadataにマージされ、タスク処理の中で参照することができます。
dag.identifier.idempotency 当該タスクの冪等性の有無を宣言するフラグです。
デフォルトはFalseです。
DAGワークフローが走行中にワークロード破壊が発生するとクラッシュリカバリが作動します。クラッシュリカバリの際に当該タスクの本フラグが True設定であれば再実行されます。
False設定の場合は再実行せずに Failed状態に遷移します。
dag.identifier.runCondition 当該タスクが実行可能か否かを判定する条件を指定できるフィールドです。pythonの条件式を記述します。 例えば、dryrun==Falseと記述した場合、当該タスクに到達した時点のmetadataにdryrunパラメータが設定されていて且つ、Falseの場合にタスクが実行されます。Trueもしくはdryrunパラメータが未定義の場合は、タスクはパス(Passed)されます。パスされた場合、本タスクに依存している下流のタスクもパスされます。パスされたタスクの下流に存在し、自身の上流タスクが全てパスされている場合パス対象となります。これによってワークフローの分岐を表現することができます。
metadata DAGワークフローの実行時名前空間の初期値です。 任意の辞書を指定できます。DAGタスクで利用するOperatorの関数定義情報から関数の引数を本metadata空間から自動バインドします。
autoReverseOnFailure DAGワークフローがdownstream走行中にタスク実行が失敗した際に自動的にupstream走行を開始するフラグです。
デフォルトはFalseです。
-

Note

dag.identifier.operator.operatorTypeで設定できる種別ThirdPartyPluginOperatorは、v23.2LTSで新たに導入されたOperatorです。
ThirdPartyPluginOperatorを利用する場合は、持ち込みpythonライブラリのディレクトリをプラグインディレクトリ配下に配置し、SDK起動パラメータの--extension_base_dirに持ち込みpythonライブラリへのパスを指定する必要があります。
例: --extension_base_dir=/var/plugins/example/myLib

Note

dag.identifier.operator.operatorTypeで設定できる種別DagPluginOperatorは、v23.2LTSで新たに導入されたOperatorです。


Rewind機能

DAGワークフローでは有向非巡回グラフで実行順序を決定し、並列実行するため、ループ構造には一定の制約があります。
以下の原則に則り、並列タスクに影響を及ぼさない、且つフローを複雑化させない範囲で部分的にタスクを巻き戻し、反復させることが可能です。

Rewindの原則

  • 分岐点へのRewindは、禁止(並列タスクに影響有りのため)
  • 分岐点を跨ぐRewindは、禁止(並列タスクに影響有りのため)
  • 合流点からのRewindは、禁止(複雑化するため合流前に完結すべき)
  • 合流点を跨ぐRewindは、禁止(複雑化するため合流前に完結すべき)

下図では、Rewindの原則に従って各タスクからRewind可能なパタンを点線で表現しています。

Note

Rewindは、v23.2LTSで新たに導入された機能です。


特殊例外

DagRewind例外

当該DAGタスクの処理途中でタスクを巻き戻してやり直したい場合にスローする例外です。
キーワード引数には、modestepを指定することができます。

Rewindモードは以下の2種類が規定されています。デフォルトはGO_BACK_TO_LIMITです。

  • GO_BACK_TO_LIMIT: 巻き戻し限界点まで戻る

  • GO_BACK_N_STEPS: 指定したステップ分戻る(ただし、巻き戻し制約限界点に達した場合は限界点まで)

# 巻き戻し限界点まで戻る
raise DagRewind()

# 明示的にモードを指定して巻き戻し限界点まで戻る
raise DagRewind(mode=RewindMode.GO_BACK_TO_LIMIT.value)

# 1ステップ前のタスクに戻る
raise DagRewind(mode=RewindMode.GO_BACK_N_STEPS.value, step=1)


DagTermination例外

当該DAGタスクの処理途中で残りのタスクを全てスキップしてフロー全体を終了させたい場合にスローする例外です。引数はありません。

raise DagTermination()


DagApprovalPending例外

当該DAGタスクを強制的に承認待ちに遷移する例外です。引数はありません。

raise DagApprovalPending()


Note

特殊例外は、v23.2LTSで新たに導入された機能です。


DAGプラグインのサンプル定義

DAGプラグインのイメージを理解するために以下の構成を例に記述します。この構成では最初のステップとして準備タスクを並列で実施した後、メインのタスクを実施し、最後のステップで後処理タスクを並列で行うといった簡単なワークフローです。準備タスクが完了したらメインタスクの開始は承認行為が必要な前提とします。また事前準備以降何らかの例外が発生した場合は、後処理タスクを実行してロールバックするイメージです。

本サンプルで使用するFunctionプラグインを事前に定義します。ここでは簡単にqprint組込関数で自身の関数名を出力するだけのサンプル関数とします。

category: example
name: preparationWorkA
code: |
  def preparationWorkA():
      import sys
      qprint(sys._getframe().f_code.co_name)

===

category: example
name: preparationWorkB
code: |
  def preparationWorkB():
      import sys
      qprint(sys._getframe().f_code.co_name)

===

name: mainWork
category: example
code: |
  def mainWork():
      import sys
      qprint(sys._getframe().f_code.co_name)

===

name: cleanupWorkA
category: example
code: |
  def cleanupWorkA():
      import sys
      qprint(sys._getframe().f_code.co_name)

===

name: cleanupWorkB
category: example
code: |
  def cleanupWorkB():
      import sys
      qprint(sys._getframe().f_code.co_name)


それではDAGプラグインをREPLで作成してみます。尚、通常DAGプラグインは、SDKポータルのGUI上で簡単に作成することができるのでREPLで開発するケースは稀です。SDKポータルでのDAG作成、各種操作はポータルのマニュアルを参照ください。

>>> dag = DAG(category="example", name="example", autoReverseOnFailure=True)↵
... dag.add("prepareA", "preparationWorkA", reverseFuncname="cleanupWorkA", label="準備-A")↵
... dag.add("prepareB", "preparationWorkB", reverseFuncname="cleanupWorkB", label="準備-B")↵
... dag.add("main", "mainWork", dependsOn=["prepareA", "prepareB"], label="メイン", approvalRequired=True)↵
... dag.add("cleanupA", "cleanupWorkA", dependsOn=["main"], label="後処理-A")↵
... dag.add("cleanupB", "cleanupWorkB", dependsOn=["main"], label="後処理-B")↵
... await dag.save()↵
... print(dag.yaml_format)↵
... ↵
↵
autoReverseOnFailure: true
category: example
dag:
  cleanupA:
    approvalRequired: false
    dependsOn:
    - main
    funcname: cleanupWorkA
    label: 後処理-A
    reverseFuncname: null
  cleanupB:
    approvalRequired: false
    dependsOn:
    - main
    funcname: cleanupWorkB
    label: 後処理-B
    reverseFuncname: null
  main:
    approvalRequired: true
    dependsOn:
    - prepareA
    - prepareB
    funcname: mainWork
    label: メイン
    reverseFuncname: null
  prepareA:
    approvalRequired: false
    dependsOn: []
    funcname: preparationWorkA
    label: 準備-A
    reverseFuncname: cleanupWorkA
  prepareB:
    approvalRequired: false
    dependsOn: []
    funcname: preparationWorkB
    label: 準備-B
    reverseFuncname: cleanupWorkB
metadata: {}
name: example
>>>


作成したDAGプラグインからDAGインスタンスを生成し、実行してみます。各Functionプラグインはqprint組込関数で自身の関数名を出力するのでREPLのdebugモードに遷移してから実行してください。

>>> debug()↵
Disconnected the currently subscribed channel and connected to the debug channel ['xaas.southbound.channel.1']
debug channel connected
>>> dag = await DAG.load("example")↵
... instance = dag.createInstance()↵
... print(instance.status)↵
... ↵
↵
NotRunning
>>> instance.run()↵
... ↵
↵
preparationWorkB
preparationWorkA
>>> print(instance.status)↵
... ↵
↵
ApprovalPending
>>> await instance.approval(["main"])↵
... instance.run()↵
... ↵
↵
mainWork
cleanupWorkB
cleanupWorkA
>>> print(instance.status)↵
... ↵
↵
Done
>>>


DAG操作のための組込オブジェクト

先述したDAGプラグインのサンプル定義の中で登場したDAG操作のための組込オブジェクトについて解説します。

DAG組込クラス

DAG組込クラスは、DAGプラグインの作成、変更、削除、DAGInstanceの生成が可能です。各操作方法を順に解説します。

コンストラクタ

コンストラクタは、categorynamemetadataautoReverseOnFailureの4つのキーワード引数を受け取ります。categoryのデフォルトはNone、nameのデフォルトはUUID、metadataのデフォルトは空の辞書、autoReverseOnFailureのデフォルトはFalseに設定されます。コンストラクタを呼び出しただけではプラグインとして永続化されないことに注意してください。ただし、DAGInstanceを生成して実行することは可能です。

dag = DAG(
    category="example",
    name="example",
    metadata={
        "param1": "hello",
        "param2": "world"
    },
    autoReverseOnFailure=True
)


タスクを追加する

コンストラクタによって作成したDAGクラスのインスタンスが持つグラフにDAGTaskを追加するにはaddメソッドを使用します。第一引数はタスク識別子でDAGが持つグラフ内でタスクを一意に識別できるIDを開発者自身が付与します。第二引数はdownstream方向で実行するFunctionプラグイン名を指定します。以降のキーワード引数はオプションとなり、reverseFuncnameにはupstream方向で実行するFunctionプラグイン名を指定します。dependsOnには、本タスクが依存するタスクのリストを指定します。ここで指定したタスクが全て完了していれば本タスクが実行される動作となります。
labelはユーザが自由に文字列を書き込めるフィールドです。タスクの説明などを記載できます。approvalRequiredは、タスクの実行前にオペレータの承認行為が必要か否かを指定するフラグです。デフォルトはFalseとなっています。Trueに設定した場合、承認待ち状態となり承認行為が行われるまでは当該タスクが実行されることはありません。

dag.add(
    "task1",
    "downstreamFunc",
    reverseFuncname="upstreamFunc",
    dependsOn=[],
    label="1st task",
    approvalRequired=False
)


タスクを修正する

タスクの修正は、updateメソッドを使用します。引数はaddメソッドと完全に一致しています。第一引数で指定したタスク識別子のタスクを完全に置き換えます。

dag.update(
    "task1",
    "downstreamFunc1",
    reverseFuncname="upstreamFunc1",
    dependsOn=["task0"],
    label="1st remote task",
    approvalRequired=True
)


タスクを削除する

タスクの削除は、deleteメソッドを使用します。引数は、タスク識別子のみです。指定されたタスク識別子のタスクをDAGのグラフ構造から除去します。

dag.delete(
    "task1"
)


DAGプラグインを保存する

DAGプラグインを永続化するには、saveメソッドを使用します。引数にはワークスペースを指定してください。

await dag.save(workspace="handson")


DAGプラグインを読み込む

DAGプラグインを永続化領域からロードするには、loadクラスメソッドを使用します。引数にはDAGプラグイン名を指定します。

dag = await DAG.load("example")


DAGプラグインのリストを取得する

DAGプラグインのリストを取得するには、retrieveクラスメソッドを使用します。現状は引数はありません。

dags = await DAG.retrieve()


DAGプラグインを削除する

DAGプラグインを永続化領域から削除するには、destroyメソッドを使用します。引数はありません。

await dag.destroy()


DAGの実行順序を確認する

DAGのワークフロー実行順序はイテレーションすることで確認できます(downstream方向のみ)

>>> for i in dag:↵
...     print(i)↵
... ↵
↵
('prepareA', 'prepareB')
('main',)
('cleanupA', 'cleanupB')
>>>


DAGInstance組込クラス

DAGInstance組込クラスは、DAGプラグインの実行クラスです。DAGワークフローを駆動し、状態を管理します。各操作方法を順に解説します。

DAGInstanceの生成

DAGInstanceは、DAGオブジェクトのcreateInstanceメソッドを使用します。引数はありません。本メソッドを呼び出しただけでは永続化されません。実行した場合は自動的に永続化されます。

dag = await DAG.load("example")
instance = dag.createInstance()


ワークフロー実行

DAGInstanceの実行は、runメソッドを使用します。引数は任意のキーワード引数を受け取ります。指定したキーワード引数はメタデータとして解釈されます。メタデータについては次のセクションで後述します。

instance.run(
    arg1="hello",
    arg2="world"
)


また、ワークフローを巻き戻す場合は、_reverse=Trueキーワード引数を指定してください。後述する承認操作に対して否認する場合はこの巻き戻しを使用します。ワークフロー全体が処理完了してDone状態になっていたとしても_reverse=Trueでの巻き戻し走行が可能です。

instance.run(
    _reverse=True
)


タスクの承認

DAGワークフローは、approvalRequiredTrueに設定されているタスクに到達するとApprovalPending状態で停止します。この状態から先に進めるには承認行為が必要です。承認するためにはapprovalメソッドを使用します。本メソッドの引数は承認するタスクの識別子のリストを指定してください。尚、本メソッドを実行した後、処理を再開するには再度runメソッドを呼び出す必要があります。

await instance.approval(["main"])


承認と再開を同時に行う場合は、以下のようにwithRunキーワード引数にTrueを指定してください。尚、**kwargsを引数に受け取れますので任意のメタデータを承認時に追加することが可能です。

await instance.approval(["main"], withRun=True)


タスクのスキップ

DAGワークフローの走行中にタスクが失敗するとFailed状態に遷移します。通常はエラーとなる状況を何らかの方法で解決したあと、再実行を成功するまで繰り返すか、反転させてロールバックする想定ですが、タスク実行を諦めて前に進める必要がある場合、スキップすることができます。スキップするためにはskipメソッドを使用します。本メソッドの引数はスキップするタスクの識別子のリストを指定してください。尚、本メソッドを実行した後、処理を再開するには再度runメソッドを呼び出す必要があります。

await instance.skip(["main"])


スキップと再開を同時に行う場合は、以下のようにwithRunキーワード引数にTrueを指定してください。尚、**kwargsを引数に受け取れますので任意のメタデータを承認時に追加することが可能です。

await instance.skip(["main"], withRun=True)


状態確認

DAGInstanceの状態は、viewプロパティで確認することができます。

>>> print(json.dumps(instance.view, indent=4))↵
... ↵
↵
{
    "instance": "5cff94207a9e11edbdd2acde48001122",
    "category": "example",
    "name": "example",
    "dag": {
        "cleanupA": {
            "approvalRequired": false,
            "dependsOn": [
                "main"
            ],
            "funcname": "cleanupWorkA",
            "label": "後処理-A",
            "reverseFuncname": null
        },
        "cleanupB": {
            "approvalRequired": false,
            "dependsOn": [
                "main"
            ],
            "funcname": "cleanupWorkB",
            "label": "後処理-B",
            "reverseFuncname": null
        },
        "main": {
            "approvalRequired": true,
            "dependsOn": [
                "prepareA",
                "prepareB"
            ],
            "funcname": "mainWork",
            "label": "メイン",
            "reverseFuncname": null
        },
        "prepareA": {
            "approvalRequired": false,
            "dependsOn": [],
            "funcname": "preparationWorkA",
            "label": "準備-A",
            "reverseFuncname": "cleanupWorkA"
        },
        "prepareB": {
            "approvalRequired": false,
            "dependsOn": [],
            "funcname": "preparationWorkB",
            "label": "準備-B",
            "reverseFuncname": "cleanupWorkB"
        }
    },
    "metadata": {},
    "autoReverseOnFailure": true,
    "status": "ApprovalPending"
}
>>>


上記のviewプロパティはDAGInstanceのワークフロー実行状態が確認できますが、ワークフローに含まれる各タスクの状況は把握できません。タスクの状況も含めて全体を確認したい場合は、statusViewメソッドで情報を取得します。

>>> v = await instance.statusView()↵
... print(json.dumps(v, indent=4))↵
... ↵
↵
{
    "instance": "5cff94207a9e11edbdd2acde48001122",
    "category": "example",
    "name": "example",
    "dag": {
        "cleanupA": {
            "approvalRequired": false,
            "dependsOn": [
                "main"
            ],
            "funcname": "cleanupWorkA",
            "label": "後処理-A",
            "reverseFuncname": null,
            "status": "NotRunning"
        },
        "cleanupB": {
            "approvalRequired": false,
            "dependsOn": [
                "main"
            ],
            "funcname": "cleanupWorkB",
            "label": "後処理-B",
            "reverseFuncname": null,
            "status": "NotRunning"
        },
        "main": {
            "approvalRequired": true,
            "dependsOn": [
                "prepareA",
                "prepareB"
            ],
            "funcname": "mainWork",
            "label": "メイン",
            "reverseFuncname": null,
            "status": "ApprovalPending"
        },
        "prepareA": {
            "approvalRequired": false,
            "dependsOn": [],
            "funcname": "preparationWorkA",
            "label": "準備-A",
            "reverseFuncname": "cleanupWorkA",
            "status": "Done"
        },
        "prepareB": {
            "approvalRequired": false,
            "dependsOn": [],
            "funcname": "preparationWorkB",
            "label": "準備-B",
            "reverseFuncname": "cleanupWorkB",
            "status": "Done"
        }
    },
    "metadata": {},
    "autoReverseOnFailure": true,
    "status": "ApprovalPending"
}
>>>


インスタンスのロード

DAGInstanceをロードする場合は、loadクラスメソッドを使用します。引数には識別子を渡す必要があります。

instance = await DAGInstance.load("5cff94207a9e11edbdd2acde48001122")


インスタンスの一覧

DAGInstanceのリストを取得する場合は、retrieveクラスメソッドを使用します。

instances = await DAGInstance.retrieve()


インスタンスの保存

DAGInstanceの永続化はsaveメソッドを使用します。

await instance.save()


インスタンスの削除

DAGInstanceを永続化領域から削除する場合はdestroyメソッドを使用します。

await instance.destroy()


DAGのメタデータ

DAGワークフローの全てのタスクが共用できるグローバル名前空間としてmetadataが利用できます。各Function/Moduleプラグインの実行時にmetadataに存在する変数が引数として自動的にバインドされます。metadataは、DAG定義時、DAGInstance生成時、DAGInstance走行開始時に指定することが可能で指定された場合、DAGInstanceの持つmetadataとしてマージされて管理されます。以下にmetadataのマージ及び関数バインディングのイメージを記載します。

各タスクの中で生成した変数を後続のタスクで利用したい場合、Function/Moduleプラグインの関数内で辞書オブジェクトを返却するとmetadataにマージされます。
(Functionプラグインの中でatomオブジェクトを使用してDB経由で伝達することももちろん可能です)


Note

DAGのメタデータは基本的に開発者が自由に定義しますが、Qmonus SDKが提供するデフォルトのメタデータも存在します。デフォルトメタデータは、__instance____identifier____reverse__の3つです。__instance__は、DAGInstanceの識別子でDAGのタスク関数の中で自身の呼び元であるDAGInstanceを識別したい場合に引数として受け取ることが可能です。__identifier__は、DAGTaskの識別子でDAGのタスク関数の中で自身のDAGInstance内における識別子を識別したい場合に引数として受け取ることが可能です。__reverse__は、DAGTaskがdownstream方向で実行されているのか、upstream方向で実行されているのかを判定することができるフラグです。Trueの場合、upstream方向で実行されていることになります。これは一つのFunctionプラグインでdownstream/upstream双方向の処理を記述したいケースにおいて利用されることを想定しています。


ワークフローの状態遷移について

DAGInstanceの状態遷移

ワークフローを司るDAGInstanceの状態遷移は以下の通りです。


Tip

DAGワークフローの走行Podが何らかの理由により、ダウンした場合、TransactionマスターサーバによるScenario Pod監視で検出され、自動的にクラッシュリカバリが発動します。クラッシュリカバリはTransactionサーバの起動パラメータで--dag_crash_recovery_mode=Trueを明示的に設定する必要があります。

DAGTaskInstanceの状態遷移

ワークフローの各タスクを司るDAGTaskInstanceの状態遷移は以下の通りです。


Note

SkippedPassedの違いですが、Skippedはオペレータの指示に従ってタスクを実行せず次に進む(つまり下流のタスクは実行される)状態である一方PassedはrunConditionに合致しない場合に自身より下流のタスクを実行しない状態となります。分岐条件に合致しなかったルート上のタスクはPassed状態となります。

チュートリアル

DAGのチュートリアルとして以下の構造を持つワークフローを作成してみます。

Functionプラグインの作成

本来は、task-a〜task-nまで個別の処理を記述することが多いと考えられますが、ここでは簡易化とフロー制御の確認のしやすさを考慮して全て同一のFunctionプラグインを利用する実装とします。

共通的に使用するFunctionプラグインを以下のように記述します。尚、アノテーションの記述は任意です。

category: DAG
name: exampleDagFunction
code: |
  async def exampleDagFunction(
      __instance__: str,
      __identifier__: str,
      __reverse__: bool,
      message: str,
      failIdentifier: str = None,
      delay: int = 0
  ) -> dict:
      """DAG関数のサンプル
      本DAG関数は、DAGワークフローの挙動を学習するためのサンプルです。
      DAG関数として受け取り可能な予約引数としては、__instance__、
      __identifier__、__reverse__の3つがあります。それぞれの用途は以下の
      パラメータ解説をご確認ください。

      本関数では予約引数に加えてmessage、failIdentifier、delayの3つの引数
      を定義しています(引数は自由に定義することができます)。これらの引数に
      はDAG定義時に指定できるmetadata、DAG走行時に指定できるmetadata、各種
      DAG関数が返却した辞書オブジェクトが全てマージされた名前空間から名称
      が一致するものが検索され、パラメータとして渡される仕組みとなっています。

      Parameters:
      __instance__: str
        DAG関数の引数として使用できる予約語でDAGInstanceの識別子を
        受け取りたい場合に指定します(必須ではありません)
      __identifier__: str
        DAG関数の引数として使用できる予約語でDAG関数を参照している
        DAGInstance内のタスク識別子を受け取りたい場合に指定します
        (必須ではありません)
      __reverse__: bool
        DAG関数の引数として使用できる予約語でDAGフローが逆走中に実行
        されているのかをDAG関数内で判定したい場合に指定します
        単一の関数でDAGフローのdownstream/upstreamの両方向の処理を共通化
        したい場合などで利用することを想定しています(必須ではありません)
      message: str
      failIdentifier: str
      delay: int
      """
      import sys
      # delay引数で指定された秒数スリープする
      await asyncio.sleep(delay)
      """
      failIdentifierで指定されたDAGタスクから呼ばれている場合でdownstream方向
      の場合、強制的に例外を発出する
      """
      if failIdentifier == __identifier__ and not __reverse__:
          raise Error(
              500,
              reason=f"Reached force fail identifier task"
          )
      """
      DAGタスク識別子をキーとして処理結果を辞書で返却する
      ここで返却された辞書はDAGInstanceのmetadataにマージされます
      """
      return {
          __identifier__: {
              "funcname": sys._getframe().f_code.co_name,
              "delay": delay,
              "message": f"{message} {__identifier__}",
              "timestamp": clock.now().isoformat(),
              "reverse": __reverse__
          }
      }


DAGプラグインの作成

次にDAGプラグインを以下のように作成します。

category: example
metadata:
  message: hello,
autoReverseOnFailure: true
name: helloDAG
dag:
  task-a:
    approvalRequired: false
    dependsOn: []
    funcname: exampleDagFunction
    reverseFuncname: exampleDagFunction
    label: null
  task-b:
    approvalRequired: false
    dependsOn: []
    funcname: exampleDagFunction
    reverseFuncname: exampleDagFunction
    label: null
  task-k:
    approvalRequired: false
    dependsOn:
    - task-e
    funcname: exampleDagFunction
    reverseFuncname: exampleDagFunction
    label: null
  task-l:
    approvalRequired: false
    dependsOn:
    - task-e
    funcname: exampleDagFunction
    reverseFuncname: exampleDagFunction
    label: null
  task-m:
    approvalRequired: false
    dependsOn:
    - task-k
    funcname: exampleDagFunction
    reverseFuncname: exampleDagFunction
    label: null
  task-n:
    approvalRequired: false
    dependsOn:
    - task-m
    funcname: exampleDagFunction
    reverseFuncname: exampleDagFunction
    label: null
  task-c:
    approvalRequired: false
    dependsOn: []
    funcname: exampleDagFunction
    reverseFuncname: exampleDagFunction
    label: null
  task-d:
    approvalRequired: false
    dependsOn:
    - task-a
    - task-b
    - task-c
    funcname: exampleDagFunction
    reverseFuncname: exampleDagFunction
    label: null
  task-e:
    approvalRequired: false
    dependsOn:
    - task-d
    funcname: exampleDagFunction
    reverseFuncname: exampleDagFunction
    label: null
  task-f:
    approvalRequired: false
    dependsOn:
    - task-d
    funcname: exampleDagFunction
    reverseFuncname: exampleDagFunction
    label: null
  task-g:
    approvalRequired: true
    dependsOn:
    - task-d
    funcname: exampleDagFunction
    reverseFuncname: exampleDagFunction
  task-h:
    approvalRequired: false
    dependsOn:
    - task-g
    funcname: exampleDagFunction
    reverseFuncname: exampleDagFunction
    label: null
  task-i:
    approvalRequired: false
    dependsOn:
    - task-g
    funcname: exampleDagFunction
    reverseFuncname: exampleDagFunction
    label: null
  task-j:
    approvalRequired: false
    dependsOn:
    - task-h
    funcname: exampleDagFunction
    reverseFuncname: exampleDagFunction
    label: null


DAGワークフローの実行

作成したワークフローを実行します。

>>> dag = await DAG.load("helloDAG")↵
... instance = dag.createInstance()↵
... instance.run()↵
... ↵
↵
>>> instance.status↵
... ↵
↵
ApprovalPending
>>> instance.metadata↵
... ↵
↵
{'message': 'hello,', 'task-a': {'funcname': 'exampleDagFunction', 'delay': 0, 'message': 'hello, task-a', 'timestamp': '2022-12-13T14:26:47.105225+09:00', 'reverse': False}, 'task-b': {'funcname': 'exampleDagFunction', 'delay': 0, 'message': 'hello, task-b', 'timestamp': '2022-12-13T14:26:47.107193+09:00', 'reverse': False}, 'task-c': {'funcname': 'exampleDagFunction', 'delay': 0, 'message': 'hello, task-c', 'timestamp': '2022-12-13T14:26:47.123052+09:00', 'reverse': False}, 'task-d': {'funcname': 'exampleDagFunction', 'delay': 0, 'message': 'hello, task-d', 'timestamp': '2022-12-13T14:26:47.135570+09:00', 'reverse': False}, 'task-e': {'funcname': 'exampleDagFunction', 'delay': 0, 'message': 'hello, task-e', 'timestamp': '2022-12-13T14:26:47.152861+09:00', 'reverse': False}, 'task-f': {'funcname': 'exampleDagFunction', 'delay': 0, 'message': 'hello, task-f', 'timestamp': '2022-12-13T14:26:47.153428+09:00', 'reverse': False}, 'task-k': {'funcname': 'exampleDagFunction', 'delay': 0, 'message': 'hello, task-k', 'timestamp': '2022-12-13T14:26:47.180772+09:00', 'reverse': False}, 'task-l': {'funcname': 'exampleDagFunction', 'delay': 0, 'message': 'hello, task-l', 'timestamp': '2022-12-13T14:26:47.182448+09:00', 'reverse': False}, 'task-m': {'funcname': 'exampleDagFunction', 'delay': 0, 'message': 'hello, task-m', 'timestamp': '2022-12-13T14:26:47.197391+09:00', 'reverse': False}, 'task-n': {'funcname': 'exampleDagFunction', 'delay': 0, 'message': 'hello, task-n', 'timestamp': '2022-12-13T14:26:47.209028+09:00', 'reverse': False}}
>>>


ワークフローの承認待ちタスクがあるので承認して再開します。

>>> await instance.approval(["task-g"])↵
... instance.run()↵
... ↵
↵
>>> instance.status↵
... ↵
↵
Done
>>> instance.metadata↵
... ↵
↵
{'message': 'hello,', 'task-a': {'funcname': 'exampleDagFunction', 'delay': 0, 'message': 'hello, task-a', 'timestamp': '2022-12-13T14:26:47.105225+09:00', 'reverse': False}, 'task-b': {'funcname': 'exampleDagFunction', 'delay': 0, 'message': 'hello, task-b', 'timestamp': '2022-12-13T14:26:47.107193+09:00', 'reverse': False}, 'task-c': {'funcname': 'exampleDagFunction', 'delay': 0, 'message': 'hello, task-c', 'timestamp': '2022-12-13T14:26:47.123052+09:00', 'reverse': False}, 'task-d': {'funcname': 'exampleDagFunction', 'delay': 0, 'message': 'hello, task-d', 'timestamp': '2022-12-13T14:26:47.135570+09:00', 'reverse': False}, 'task-e': {'funcname': 'exampleDagFunction', 'delay': 0, 'message': 'hello, task-e', 'timestamp': '2022-12-13T14:26:47.152861+09:00', 'reverse': False}, 'task-f': {'funcname': 'exampleDagFunction', 'delay': 0, 'message': 'hello, task-f', 'timestamp': '2022-12-13T14:26:47.153428+09:00', 'reverse': False}, 'task-k': {'funcname': 'exampleDagFunction', 'delay': 0, 'message': 'hello, task-k', 'timestamp': '2022-12-13T14:26:47.180772+09:00', 'reverse': False}, 'task-l': {'funcname': 'exampleDagFunction', 'delay': 0, 'message': 'hello, task-l', 'timestamp': '2022-12-13T14:26:47.182448+09:00', 'reverse': False}, 'task-m': {'funcname': 'exampleDagFunction', 'delay': 0, 'message': 'hello, task-m', 'timestamp': '2022-12-13T14:26:47.197391+09:00', 'reverse': False}, 'task-n': {'funcname': 'exampleDagFunction', 'delay': 0, 'message': 'hello, task-n', 'timestamp': '2022-12-13T14:26:47.209028+09:00', 'reverse': False}, 'task-g': {'funcname': 'exampleDagFunction', 'delay': 0, 'message': 'hello, task-g', 'timestamp': '2022-12-13T14:29:12.148878+09:00', 'reverse': False}, 'task-h': {'funcname': 'exampleDagFunction', 'delay': 0, 'message': 'hello, task-h', 'timestamp': '2022-12-13T14:29:12.174069+09:00', 'reverse': False}, 'task-i': {'funcname': 'exampleDagFunction', 'delay': 0, 'message': 'hello, task-i', 'timestamp': '2022-12-13T14:29:12.174560+09:00', 'reverse': False}, 'task-j': {'funcname': 'exampleDagFunction', 'delay': 0, 'message': 'hello, task-j', 'timestamp': '2022-12-13T14:29:12.198975+09:00', 'reverse': False}}
>>>


ワークフローを切り戻します。

>>> instance.run(_reverse=True)↵
... ↵
↵
>>> instance.status↵
... ↵
↵
NotRunning
>>> instance.metadata↵
... ↵
↵
{'message': 'hello,', 'task-a': {'funcname': 'exampleDagFunction', 'delay': 0, 'message': 'hello, task-a', 'timestamp': '2022-12-13T14:30:25.011732+09:00', 'reverse': True}, 'task-b': {'funcname': 'exampleDagFunction', 'delay': 0, 'message': 'hello, task-b', 'timestamp': '2022-12-13T14:30:25.010712+09:00', 'reverse': True}, 'task-c': {'funcname': 'exampleDagFunction', 'delay': 0, 'message': 'hello, task-c', 'timestamp': '2022-12-13T14:30:25.028574+09:00', 'reverse': True}, 'task-d': {'funcname': 'exampleDagFunction', 'delay': 0, 'message': 'hello, task-d', 'timestamp': '2022-12-13T14:30:24.995897+09:00', 'reverse': True}, 'task-e': {'funcname': 'exampleDagFunction', 'delay': 0, 'message': 'hello, task-e', 'timestamp': '2022-12-13T14:30:24.985179+09:00', 'reverse': True}, 'task-f': {'funcname': 'exampleDagFunction', 'delay': 0, 'message': 'hello, task-f', 'timestamp': '2022-12-13T14:30:24.887524+09:00', 'reverse': True}, 'task-k': {'funcname': 'exampleDagFunction', 'delay': 0, 'message': 'hello, task-k', 'timestamp': '2022-12-13T14:30:24.967443+09:00', 'reverse': True}, 'task-l': {'funcname': 'exampleDagFunction', 'delay': 0, 'message': 'hello, task-l', 'timestamp': '2022-12-13T14:30:24.888207+09:00', 'reverse': True}, 'task-m': {'funcname': 'exampleDagFunction', 'delay': 0, 'message': 'hello, task-m', 'timestamp': '2022-12-13T14:30:24.948342+09:00', 'reverse': True}, 'task-n': {'funcname': 'exampleDagFunction', 'delay': 0, 'message': 'hello, task-n', 'timestamp': '2022-12-13T14:30:24.907596+09:00', 'reverse': True}, 'task-g': {'funcname': 'exampleDagFunction', 'delay': 0, 'message': 'hello, task-g', 'timestamp': '2022-12-13T14:30:24.968165+09:00', 'reverse': True}, 'task-h': {'funcname': 'exampleDagFunction', 'delay': 0, 'message': 'hello, task-h', 'timestamp': '2022-12-13T14:30:24.947750+09:00', 'reverse': True}, 'task-i': {'funcname': 'exampleDagFunction', 'delay': 0, 'message': 'hello, task-i', 'timestamp': '2022-12-13T14:30:24.926275+09:00', 'reverse': True}, 'task-j': {'funcname': 'exampleDagFunction', 'delay': 0, 'message': 'hello, task-j', 'timestamp': '2022-12-13T14:30:24.905412+09:00', 'reverse': True}}
>>>


task-mを失敗させて自動切り戻しします。

>>> i.run(failIdentifier="task-m", delay=1)↵
... await asyncio.sleep(1)↵
... while i.status != "NotRunning":↵
...     i.status↵
...     await asyncio.sleep(1)↵
... i.status↵
... ↵
Running
Running
Running
Running
Running
ReverseRunning
ReverseRunning
ReverseRunning
ReverseRunning
ReverseRunning
ReverseRunning
↵
NotRunning
>>>