DAG (Directed Acyclic Graph)
DAG(Directed Acyclic Graph)は、有向非巡回グラフ型のワークフローモデルです。
DAGは、処理タスク間の依存関係を定義したワークフローで以下のようなイメージになります。この例では、DAGフローが開始されるとtask-aとtask-bが並列実行され、双方が完了するとtask-cが実行されます。task-cが完了するとtask-dとtask-eが並列実行され、task-dが完了するとtask-fを実行して終了します。
DAG管理モデル
DAGプラグインは、DAGオブジェクトとDAGTaskオブジェクトで構成されます。DAGはワークフローで処理するタスクをDAGTaskとしてグラフ構造で管理します。DAGTaskは処理実装への参照(Operator)を保持しています。 タスクの処理実装への参照であるOperatorは、4種類存在します。
-
FunctionPluginOperator
: 特定のFunctionプラグインへの参照 -
ModulePluginOperator
: 特定のModuleプラグインに含まれる特定の関数への参照 -
ThirdPartyPluginOperator
: 特定の持ち込みpythonパッケージ内の特定モジュールに含まれる特定の関数への参照 -
DagPluginOperator
: 特定のDAGプラグインへの参照
DAGオブジェクトは定義オブジェクトであることから状態を保持しません。DAGを実行し、ワークフローを稼働させるにはDAGInstanceオブジェクトを生成します。これはDAGオブジェクトの定義を引き継ぎ、配下に各DAGTaskの実行状態を管理するDAGTaskInstanceを生成します。DAGTaskInstanceはOperatorを通じてタスク処理の実行管理を行います。
従ってDAGInstance生成後にOperatorが参照している処理実装を変更した場合は、最新の処理内容で実行される点に注意が必要です。
管理モデルの概念図を以下に記載しています。
タスクに設定されたOperatorが例外を発出し、失敗した場合にロールバック処理を実行させることができます。ロールバック処理はDAGTaskにロールバックのためのOperatorを指定することで任意の処理を実装できます。
DAGプラグインが保有する属性
属性 | 概要 | 備考 |
---|---|---|
workspace |
ワークスペースを指定します。 | - |
name |
名前を指定します。 | ユニークである必要があります。 |
category |
カテゴリを指定します。 | - |
dag | 構成要素となるタスクの構造定義です。 | - |
dag.identifier |
DAGタスクの識別子を指定します。 | ^[a-zA-Z][a-zA-Z_0-9-]{1,32}$ の正規表現に合致する文字列のみ許容します。また同一DAG内でユニークである必要があります。 |
dag.identifier.dependsOn |
DAGタスクの依存先タスクの識別子のリストを指定します。 | - |
dag.identifier.operator |
DAGタスクで実行する処理を提供するオペレータを定義します。 | - |
dag.identifier.operator.operatorType |
DAGタスクで実行する処理を提供するオペレータの種別を指定します。 | 選択可能な種別は、FunctionPluginOperator 、ModulePluginOperator 、ThirdPartyPluginOperator 、DagPluginOperator のいずれかです。 |
dag.identifier.operator.package |
dag.identifier.operator.operatorType にてThirdPartyPluginOperator を選択した場合に、持ち込みpythonライブラリのpackage名を指定します。 |
- |
dag.identifier.operator.modulename |
dag.identifier.operator.operatorType にてThirdPartyPluginOperator やModulePluginOperator を選択した場合に、持ち込みpythonライブラリ内のモジュール名やModuleプラグイン名を指定します。 |
- |
dag.identifier.operator.funcname |
DAGフローがdownstream方向に稼働している場合に当該タスクが実行する関数を指定します。 | FunctionPluginOperator の場合は、Functionプラグインの名前を指定します。ModulePluginOperator の場合は、Moduleプラグイン内の関数名を指定します。ThirdPartyPluginOperator の場合は、持ち込みpythonライブラリ内の関数名を指定します。 |
dag.identifier.operator.reverseFuncname |
DAGフローがupstream方向に稼働している場合に当該タスクが実行する関数を指定します。 | FunctionPluginOperator の場合は、Functionプラグインの名前を指定します。ModulePluginOperator の場合は、Moduleプラグイン内の関数名を指定します。ThirdPartyPluginOperator の場合は、持ち込みpythonライブラリ内の関数名を指定します。 |
dag.identifier.operator.dagName |
dag.identifier.operator.operatorType にてDagPluginOperator を選択した場合に、実行対象となるDAG名を指定します。 |
本タスクの実行時に新たなDAGインスタンスが生成され、本タスクによって実行管理されます。起動元タスクの実行時点のmetadataが起動先DAGインスタンスに渡されます。起動先DAGインスタンスがmetadataを書き換えても起動元DAGインスタンスには影響しません。 |
dag.identifier.operator.synchronized |
本タスクが実行するDAGインスタンスを同期実行するか、非同期実行するかのフラグを指定します。 デフォルトは True です。 |
True の場合、生成/実行されたDAGインスタンスがDone状態となるまで該当タスクは完了しません。尚、後述するDAGのRewind機能によって該当タスクが巻き戻される場合、生成されたDAGインスタンスはリバース実行されます。巻き戻し後に再実行されると、新たなDAGインスタンスの生成は行わず、既に生成済みのDAGインスタンスが再利用されます。 起動元、起動先のDAGインスタンスは主従関係を持ち、起動元によって起動先のインスタンスは管理されます。また、起動先のDAGインスタンスを直接削除することも許可されません。起動元インスタンスの削除と同時に削除されます。 False の場合、生成/実行されたDAGインスタンスの実行状態とは無関係に該当タスクは完了します。ワークフローの途中で新たなワークフローをフォークするだけの動作となり、DAGインスタンス同士は互いに干渉しません。 |
dag.identifier.label |
当該タスクの注釈や処理概要を記載できるユーザラベルです。 | DAGワークフローの挙動には無関係です。 |
dag.identifier.approvalRequired |
当該タスクの実行に際してユーザからの承認行為が必要か否かを指定するフラグです。 デフォルトは False です。 |
- |
dag.identifier.approvalSchema |
承認待ちタスクの承認時に承認者からの入力を受け付ける場合の入力スキーマを定義できます。 | 入力されたデータは、DAGインスタンスのmetadataにマージされ、タスク処理の中で参照することができます。 |
dag.identifier.skipSchema |
承認待ちタスクをスキップする際に承認者からの入力を受け付ける場合の入力スキーマを定義できます。 | 入力されたデータは、DAGインスタンスのmetadataにマージされ、タスク処理の中で参照することができます。 |
dag.identifier.idempotency |
当該タスクの冪等性の有無を宣言するフラグです。 デフォルトは False です。 |
DAGワークフローが走行中にワークロード破壊が発生するとクラッシュリカバリが作動します。クラッシュリカバリの際に当該タスクの本フラグが True 設定であれば再実行されます。False 設定の場合は再実行せずに Failed 状態に遷移します。 |
dag.identifier.runCondition |
当該タスクが実行可能か否かを判定する条件を指定できるフィールドです。pythonの条件式を記述します。 | 例えば、dryrun==False と記述した場合、当該タスクに到達した時点のmetadataにdryrun パラメータが設定されていて且つ、False の場合にタスクが実行されます。True もしくはdryrun パラメータが未定義の場合は、タスクはパス(Passed)されます。パスされた場合、本タスクに依存している下流のタスクもパスされます。パスされたタスクの下流に存在し、自身の上流タスクが全てパスされている場合パス対象となります。これによってワークフローの分岐を表現することができます。 |
metadata |
DAGワークフローの実行時名前空間の初期値です。 | 任意の辞書を指定できます。DAGタスクで利用するOperatorの関数定義情報から関数の引数を本metadata空間から自動バインドします。 |
autoReverseOnFailure |
DAGワークフローがdownstream走行中にタスク実行が失敗した際に自動的にupstream走行を開始するフラグです。 デフォルトは False です。 |
- |
Note
dag.identifier.operator.operatorType
で設定できる種別ThirdPartyPluginOperator
は、v23.2LTSで新たに導入されたOperatorです。
ThirdPartyPluginOperator
を利用する場合は、持ち込みpythonライブラリのディレクトリをプラグインディレクトリ配下に配置し、SDK起動パラメータの--extension_base_dir
に持ち込みpythonライブラリへのパスを指定する必要があります。
例: --extension_base_dir=/var/plugins/example/myLib
Note
dag.identifier.operator.operatorType
で設定できる種別DagPluginOperator
は、v23.2LTSで新たに導入されたOperatorです。
Rewind機能
DAGワークフローでは有向非巡回グラフで実行順序を決定し、並列実行するため、ループ構造には一定の制約があります。
以下の原則に則り、並列タスクに影響を及ぼさない、且つフローを複雑化させない範囲で部分的にタスクを巻き戻し、反復させることが可能です。
Rewindの原則
- 分岐点へのRewindは、禁止(並列タスクに影響有りのため)
- 分岐点を跨ぐRewindは、禁止(並列タスクに影響有りのため)
- 合流点からのRewindは、禁止(複雑化するため合流前に完結すべき)
- 合流点を跨ぐRewindは、禁止(複雑化するため合流前に完結すべき)
下図では、Rewindの原則に従って各タスクからRewind可能なパタンを点線で表現しています。
Note
Rewindは、v23.2LTSで新たに導入された機能です。
特殊例外
DagRewind例外
当該DAGタスクの処理途中でタスクを巻き戻してやり直したい場合にスローする例外です。
キーワード引数には、mode
とstep
を指定することができます。
Rewindモードは以下の2種類が規定されています。デフォルトはGO_BACK_TO_LIMIT
です。
-
GO_BACK_TO_LIMIT
: 巻き戻し限界点まで戻る -
GO_BACK_N_STEPS
: 指定したステップ分戻る(ただし、巻き戻し制約限界点に達した場合は限界点まで)
# 巻き戻し限界点まで戻る
raise DagRewind()
# 明示的にモードを指定して巻き戻し限界点まで戻る
raise DagRewind(mode=RewindMode.GO_BACK_TO_LIMIT.value)
# 1ステップ前のタスクに戻る
raise DagRewind(mode=RewindMode.GO_BACK_N_STEPS.value, step=1)
DagTermination例外
当該DAGタスクの処理途中で残りのタスクを全てスキップしてフロー全体を終了させたい場合にスローする例外です。引数はありません。
raise DagTermination()
DagApprovalPending例外
当該DAGタスクを強制的に承認待ちに遷移する例外です。引数はありません。
raise DagApprovalPending()
Note
特殊例外は、v23.2LTSで新たに導入された機能です。
DAGプラグインのサンプル定義
DAGプラグインのイメージを理解するために以下の構成を例に記述します。この構成では最初のステップとして準備タスクを並列で実施した後、メインのタスクを実施し、最後のステップで後処理タスクを並列で行うといった簡単なワークフローです。準備タスクが完了したらメインタスクの開始は承認行為が必要な前提とします。また事前準備以降何らかの例外が発生した場合は、後処理タスクを実行してロールバックするイメージです。
本サンプルで使用するFunctionプラグインを事前に定義します。ここでは簡単にqprint組込関数で自身の関数名を出力するだけのサンプル関数とします。
category: example
name: preparationWorkA
code: |
def preparationWorkA():
import sys
qprint(sys._getframe().f_code.co_name)
===
category: example
name: preparationWorkB
code: |
def preparationWorkB():
import sys
qprint(sys._getframe().f_code.co_name)
===
name: mainWork
category: example
code: |
def mainWork():
import sys
qprint(sys._getframe().f_code.co_name)
===
name: cleanupWorkA
category: example
code: |
def cleanupWorkA():
import sys
qprint(sys._getframe().f_code.co_name)
===
name: cleanupWorkB
category: example
code: |
def cleanupWorkB():
import sys
qprint(sys._getframe().f_code.co_name)
それではDAGプラグインをREPLで作成してみます。尚、通常DAGプラグインは、SDKポータルのGUI上で簡単に作成することができるのでREPLで開発するケースは稀です。SDKポータルでのDAG作成、各種操作はポータルのマニュアルを参照ください。
>>> dag = DAG(category="example", name="example", autoReverseOnFailure=True)↵
... dag.add("prepareA", "preparationWorkA", reverseFuncname="cleanupWorkA", label="準備-A")↵
... dag.add("prepareB", "preparationWorkB", reverseFuncname="cleanupWorkB", label="準備-B")↵
... dag.add("main", "mainWork", dependsOn=["prepareA", "prepareB"], label="メイン", approvalRequired=True)↵
... dag.add("cleanupA", "cleanupWorkA", dependsOn=["main"], label="後処理-A")↵
... dag.add("cleanupB", "cleanupWorkB", dependsOn=["main"], label="後処理-B")↵
... await dag.save()↵
... print(dag.yaml_format)↵
... ↵
↵
autoReverseOnFailure: true
category: example
dag:
cleanupA:
approvalRequired: false
dependsOn:
- main
funcname: cleanupWorkA
label: 後処理-A
reverseFuncname: null
cleanupB:
approvalRequired: false
dependsOn:
- main
funcname: cleanupWorkB
label: 後処理-B
reverseFuncname: null
main:
approvalRequired: true
dependsOn:
- prepareA
- prepareB
funcname: mainWork
label: メイン
reverseFuncname: null
prepareA:
approvalRequired: false
dependsOn: []
funcname: preparationWorkA
label: 準備-A
reverseFuncname: cleanupWorkA
prepareB:
approvalRequired: false
dependsOn: []
funcname: preparationWorkB
label: 準備-B
reverseFuncname: cleanupWorkB
metadata: {}
name: example
>>>
作成したDAGプラグインからDAGインスタンスを生成し、実行してみます。各Functionプラグインはqprint
組込関数で自身の関数名を出力するのでREPLのdebugモードに遷移してから実行してください。
>>> debug()↵
Disconnected the currently subscribed channel and connected to the debug channel ['xaas.southbound.channel.1']
debug channel connected
>>> dag = await DAG.load("example")↵
... instance = dag.createInstance()↵
... print(instance.status)↵
... ↵
↵
NotRunning
>>> instance.run()↵
... ↵
↵
preparationWorkB
preparationWorkA
>>> print(instance.status)↵
... ↵
↵
ApprovalPending
>>> await instance.approval(["main"])↵
... instance.run()↵
... ↵
↵
mainWork
cleanupWorkB
cleanupWorkA
>>> print(instance.status)↵
... ↵
↵
Done
>>>
DAG操作のための組込オブジェクト
先述したDAGプラグインのサンプル定義の中で登場したDAG操作のための組込オブジェクトについて解説します。
DAG組込クラス
DAG組込クラスは、DAGプラグインの作成、変更、削除、DAGInstanceの生成が可能です。各操作方法を順に解説します。
コンストラクタ
コンストラクタは、category
、name
、metadata
、autoReverseOnFailure
の4つのキーワード引数を受け取ります。categoryのデフォルトはNone
、nameのデフォルトはUUID、metadataのデフォルトは空の辞書、autoReverseOnFailureのデフォルトはFalse
に設定されます。コンストラクタを呼び出しただけではプラグインとして永続化されないことに注意してください。ただし、DAGInstanceを生成して実行することは可能です。
dag = DAG(
category="example",
name="example",
metadata={
"param1": "hello",
"param2": "world"
},
autoReverseOnFailure=True
)
タスクを追加する
コンストラクタによって作成したDAGクラスのインスタンスが持つグラフにDAGTaskを追加するにはadd
メソッドを使用します。第一引数はタスク識別子でDAGが持つグラフ内でタスクを一意に識別できるIDを開発者自身が付与します。第二引数はdownstream方向で実行するFunctionプラグイン名を指定します。以降のキーワード引数はオプションとなり、reverseFuncname
にはupstream方向で実行するFunctionプラグイン名を指定します。dependsOn
には、本タスクが依存するタスクのリストを指定します。ここで指定したタスクが全て完了していれば本タスクが実行される動作となります。
label
はユーザが自由に文字列を書き込めるフィールドです。タスクの説明などを記載できます。approvalRequired
は、タスクの実行前にオペレータの承認行為が必要か否かを指定するフラグです。デフォルトはFalse
となっています。True
に設定した場合、承認待ち状態となり承認行為が行われるまでは当該タスクが実行されることはありません。
dag.add(
"task1",
"downstreamFunc",
reverseFuncname="upstreamFunc",
dependsOn=[],
label="1st task",
approvalRequired=False
)
タスクを修正する
タスクの修正は、update
メソッドを使用します。引数はadd
メソッドと完全に一致しています。第一引数で指定したタスク識別子のタスクを完全に置き換えます。
dag.update(
"task1",
"downstreamFunc1",
reverseFuncname="upstreamFunc1",
dependsOn=["task0"],
label="1st remote task",
approvalRequired=True
)
タスクを削除する
タスクの削除は、delete
メソッドを使用します。引数は、タスク識別子のみです。指定されたタスク識別子のタスクをDAGのグラフ構造から除去します。
dag.delete(
"task1"
)
DAGプラグインを保存する
DAGプラグインを永続化するには、save
メソッドを使用します。引数にはワークスペースを指定してください。
await dag.save(workspace="handson")
DAGプラグインを読み込む
DAGプラグインを永続化領域からロードするには、load
クラスメソッドを使用します。引数にはDAGプラグイン名を指定します。
dag = await DAG.load("example")
DAGプラグインのリストを取得する
DAGプラグインのリストを取得するには、retrieve
クラスメソッドを使用します。現状は引数はありません。
dags = await DAG.retrieve()
DAGプラグインを削除する
DAGプラグインを永続化領域から削除するには、destroy
メソッドを使用します。引数はありません。
await dag.destroy()
DAGの実行順序を確認する
DAGのワークフロー実行順序はイテレーションすることで確認できます(downstream方向のみ)
>>> for i in dag:↵
... print(i)↵
... ↵
↵
('prepareA', 'prepareB')
('main',)
('cleanupA', 'cleanupB')
>>>
DAGInstance組込クラス
DAGInstance組込クラスは、DAGプラグインの実行クラスです。DAGワークフローを駆動し、状態を管理します。各操作方法を順に解説します。
DAGInstanceの生成
DAGInstanceは、DAGオブジェクトのcreateInstance
メソッドを使用します。引数はありません。本メソッドを呼び出しただけでは永続化されません。実行した場合は自動的に永続化されます。
dag = await DAG.load("example")
instance = dag.createInstance()
ワークフロー実行
DAGInstanceの実行は、run
メソッドを使用します。引数は任意のキーワード引数を受け取ります。指定したキーワード引数はメタデータとして解釈されます。メタデータについては次のセクションで後述します。
instance.run(
arg1="hello",
arg2="world"
)
また、ワークフローを巻き戻す場合は、_reverse=True
キーワード引数を指定してください。後述する承認操作に対して否認する場合はこの巻き戻しを使用します。ワークフロー全体が処理完了してDone
状態になっていたとしても_reverse=True
での巻き戻し走行が可能です。
instance.run(
_reverse=True
)
タスクの承認
DAGワークフローは、approvalRequired
がTrue
に設定されているタスクに到達するとApprovalPending
状態で停止します。この状態から先に進めるには承認行為が必要です。承認するためにはapproval
メソッドを使用します。本メソッドの引数は承認するタスクの識別子のリストを指定してください。尚、本メソッドを実行した後、処理を再開するには再度run
メソッドを呼び出す必要があります。
await instance.approval(["main"])
承認と再開を同時に行う場合は、以下のようにwithRun
キーワード引数にTrue
を指定してください。尚、**kwargsを引数に受け取れますので任意のメタデータを承認時に追加することが可能です。
await instance.approval(["main"], withRun=True)
タスクのスキップ
DAGワークフローの走行中にタスクが失敗するとFailed
状態に遷移します。通常はエラーとなる状況を何らかの方法で解決したあと、再実行を成功するまで繰り返すか、反転させてロールバックする想定ですが、タスク実行を諦めて前に進める必要がある場合、スキップすることができます。スキップするためにはskip
メソッドを使用します。本メソッドの引数はスキップするタスクの識別子のリストを指定してください。尚、本メソッドを実行した後、処理を再開するには再度run
メソッドを呼び出す必要があります。
await instance.skip(["main"])
スキップと再開を同時に行う場合は、以下のようにwithRun
キーワード引数にTrue
を指定してください。尚、**kwargsを引数に受け取れますので任意のメタデータを承認時に追加することが可能です。
await instance.skip(["main"], withRun=True)
状態確認
DAGInstanceの状態は、view
プロパティで確認することができます。
>>> print(json.dumps(instance.view, indent=4))↵
... ↵
↵
{
"instance": "5cff94207a9e11edbdd2acde48001122",
"category": "example",
"name": "example",
"dag": {
"cleanupA": {
"approvalRequired": false,
"dependsOn": [
"main"
],
"funcname": "cleanupWorkA",
"label": "後処理-A",
"reverseFuncname": null
},
"cleanupB": {
"approvalRequired": false,
"dependsOn": [
"main"
],
"funcname": "cleanupWorkB",
"label": "後処理-B",
"reverseFuncname": null
},
"main": {
"approvalRequired": true,
"dependsOn": [
"prepareA",
"prepareB"
],
"funcname": "mainWork",
"label": "メイン",
"reverseFuncname": null
},
"prepareA": {
"approvalRequired": false,
"dependsOn": [],
"funcname": "preparationWorkA",
"label": "準備-A",
"reverseFuncname": "cleanupWorkA"
},
"prepareB": {
"approvalRequired": false,
"dependsOn": [],
"funcname": "preparationWorkB",
"label": "準備-B",
"reverseFuncname": "cleanupWorkB"
}
},
"metadata": {},
"autoReverseOnFailure": true,
"status": "ApprovalPending"
}
>>>
上記のview
プロパティはDAGInstanceのワークフロー実行状態が確認できますが、ワークフローに含まれる各タスクの状況は把握できません。タスクの状況も含めて全体を確認したい場合は、statusView
メソッドで情報を取得します。
>>> v = await instance.statusView()↵
... print(json.dumps(v, indent=4))↵
... ↵
↵
{
"instance": "5cff94207a9e11edbdd2acde48001122",
"category": "example",
"name": "example",
"dag": {
"cleanupA": {
"approvalRequired": false,
"dependsOn": [
"main"
],
"funcname": "cleanupWorkA",
"label": "後処理-A",
"reverseFuncname": null,
"status": "NotRunning"
},
"cleanupB": {
"approvalRequired": false,
"dependsOn": [
"main"
],
"funcname": "cleanupWorkB",
"label": "後処理-B",
"reverseFuncname": null,
"status": "NotRunning"
},
"main": {
"approvalRequired": true,
"dependsOn": [
"prepareA",
"prepareB"
],
"funcname": "mainWork",
"label": "メイン",
"reverseFuncname": null,
"status": "ApprovalPending"
},
"prepareA": {
"approvalRequired": false,
"dependsOn": [],
"funcname": "preparationWorkA",
"label": "準備-A",
"reverseFuncname": "cleanupWorkA",
"status": "Done"
},
"prepareB": {
"approvalRequired": false,
"dependsOn": [],
"funcname": "preparationWorkB",
"label": "準備-B",
"reverseFuncname": "cleanupWorkB",
"status": "Done"
}
},
"metadata": {},
"autoReverseOnFailure": true,
"status": "ApprovalPending"
}
>>>
インスタンスのロード
DAGInstanceをロードする場合は、load
クラスメソッドを使用します。引数には識別子を渡す必要があります。
instance = await DAGInstance.load("5cff94207a9e11edbdd2acde48001122")
インスタンスの一覧
DAGInstanceのリストを取得する場合は、retrieve
クラスメソッドを使用します。
instances = await DAGInstance.retrieve()
インスタンスの保存
DAGInstanceの永続化はsave
メソッドを使用します。
await instance.save()
インスタンスの削除
DAGInstanceを永続化領域から削除する場合はdestroy
メソッドを使用します。
await instance.destroy()
DAGのメタデータ
DAGワークフローの全てのタスクが共用できるグローバル名前空間としてmetadata
が利用できます。各Function/Moduleプラグインの実行時にmetadataに存在する変数が引数として自動的にバインドされます。metadataは、DAG定義時、DAGInstance生成時、DAGInstance走行開始時に指定することが可能で指定された場合、DAGInstanceの持つmetadataとしてマージされて管理されます。以下にmetadataのマージ及び関数バインディングのイメージを記載します。
各タスクの中で生成した変数を後続のタスクで利用したい場合、Function/Moduleプラグインの関数内で辞書オブジェクトを返却するとmetadataにマージされます。
(Functionプラグインの中でatomオブジェクトを使用してDB経由で伝達することももちろん可能です)
Note
DAGのメタデータは基本的に開発者が自由に定義しますが、Qmonus SDKが提供するデフォルトのメタデータも存在します。デフォルトメタデータは、__instance__
、__identifier__
、__reverse__
の3つです。__instance__
は、DAGInstanceの識別子でDAGのタスク関数の中で自身の呼び元であるDAGInstanceを識別したい場合に引数として受け取ることが可能です。__identifier__
は、DAGTaskの識別子でDAGのタスク関数の中で自身のDAGInstance内における識別子を識別したい場合に引数として受け取ることが可能です。__reverse__
は、DAGTaskがdownstream方向で実行されているのか、upstream方向で実行されているのかを判定することができるフラグです。True
の場合、upstream方向で実行されていることになります。これは一つのFunctionプラグインでdownstream/upstream双方向の処理を記述したいケースにおいて利用されることを想定しています。
ワークフローの状態遷移について
DAGInstanceの状態遷移
ワークフローを司るDAGInstanceの状態遷移は以下の通りです。
Tip
DAGワークフローの走行Podが何らかの理由により、ダウンした場合、TransactionマスターサーバによるScenario Pod監視で検出され、自動的にクラッシュリカバリが発動します。クラッシュリカバリはTransactionサーバの起動パラメータで--dag_crash_recovery_mode=True
を明示的に設定する必要があります。
DAGTaskInstanceの状態遷移
ワークフローの各タスクを司るDAGTaskInstanceの状態遷移は以下の通りです。
Note
Skipped
とPassed
の違いですが、Skipped
はオペレータの指示に従ってタスクを実行せず次に進む(つまり下流のタスクは実行される)状態である一方Passed
はrunConditionに合致しない場合に自身より下流のタスクを実行しない状態となります。分岐条件に合致しなかったルート上のタスクはPassed
状態となります。
チュートリアル
DAGのチュートリアルとして以下の構造を持つワークフローを作成してみます。
Functionプラグインの作成
本来は、task-a〜task-nまで個別の処理を記述することが多いと考えられますが、ここでは簡易化とフロー制御の確認のしやすさを考慮して全て同一のFunctionプラグインを利用する実装とします。
共通的に使用するFunctionプラグインを以下のように記述します。尚、アノテーションの記述は任意です。
category: DAG
name: exampleDagFunction
code: |
async def exampleDagFunction(
__instance__: str,
__identifier__: str,
__reverse__: bool,
message: str,
failIdentifier: str = None,
delay: int = 0
) -> dict:
"""DAG関数のサンプル
本DAG関数は、DAGワークフローの挙動を学習するためのサンプルです。
DAG関数として受け取り可能な予約引数としては、__instance__、
__identifier__、__reverse__の3つがあります。それぞれの用途は以下の
パラメータ解説をご確認ください。
本関数では予約引数に加えてmessage、failIdentifier、delayの3つの引数
を定義しています(引数は自由に定義することができます)。これらの引数に
はDAG定義時に指定できるmetadata、DAG走行時に指定できるmetadata、各種
DAG関数が返却した辞書オブジェクトが全てマージされた名前空間から名称
が一致するものが検索され、パラメータとして渡される仕組みとなっています。
Parameters:
__instance__: str
DAG関数の引数として使用できる予約語でDAGInstanceの識別子を
受け取りたい場合に指定します(必須ではありません)
__identifier__: str
DAG関数の引数として使用できる予約語でDAG関数を参照している
DAGInstance内のタスク識別子を受け取りたい場合に指定します
(必須ではありません)
__reverse__: bool
DAG関数の引数として使用できる予約語でDAGフローが逆走中に実行
されているのかをDAG関数内で判定したい場合に指定します
単一の関数でDAGフローのdownstream/upstreamの両方向の処理を共通化
したい場合などで利用することを想定しています(必須ではありません)
message: str
failIdentifier: str
delay: int
"""
import sys
# delay引数で指定された秒数スリープする
await asyncio.sleep(delay)
"""
failIdentifierで指定されたDAGタスクから呼ばれている場合でdownstream方向
の場合、強制的に例外を発出する
"""
if failIdentifier == __identifier__ and not __reverse__:
raise Error(
500,
reason=f"Reached force fail identifier task"
)
"""
DAGタスク識別子をキーとして処理結果を辞書で返却する
ここで返却された辞書はDAGInstanceのmetadataにマージされます
"""
return {
__identifier__: {
"funcname": sys._getframe().f_code.co_name,
"delay": delay,
"message": f"{message} {__identifier__}",
"timestamp": clock.now().isoformat(),
"reverse": __reverse__
}
}
DAGプラグインの作成
次にDAGプラグインを以下のように作成します。
category: example
metadata:
message: hello,
autoReverseOnFailure: true
name: helloDAG
dag:
task-a:
approvalRequired: false
dependsOn: []
funcname: exampleDagFunction
reverseFuncname: exampleDagFunction
label: null
task-b:
approvalRequired: false
dependsOn: []
funcname: exampleDagFunction
reverseFuncname: exampleDagFunction
label: null
task-k:
approvalRequired: false
dependsOn:
- task-e
funcname: exampleDagFunction
reverseFuncname: exampleDagFunction
label: null
task-l:
approvalRequired: false
dependsOn:
- task-e
funcname: exampleDagFunction
reverseFuncname: exampleDagFunction
label: null
task-m:
approvalRequired: false
dependsOn:
- task-k
funcname: exampleDagFunction
reverseFuncname: exampleDagFunction
label: null
task-n:
approvalRequired: false
dependsOn:
- task-m
funcname: exampleDagFunction
reverseFuncname: exampleDagFunction
label: null
task-c:
approvalRequired: false
dependsOn: []
funcname: exampleDagFunction
reverseFuncname: exampleDagFunction
label: null
task-d:
approvalRequired: false
dependsOn:
- task-a
- task-b
- task-c
funcname: exampleDagFunction
reverseFuncname: exampleDagFunction
label: null
task-e:
approvalRequired: false
dependsOn:
- task-d
funcname: exampleDagFunction
reverseFuncname: exampleDagFunction
label: null
task-f:
approvalRequired: false
dependsOn:
- task-d
funcname: exampleDagFunction
reverseFuncname: exampleDagFunction
label: null
task-g:
approvalRequired: true
dependsOn:
- task-d
funcname: exampleDagFunction
reverseFuncname: exampleDagFunction
task-h:
approvalRequired: false
dependsOn:
- task-g
funcname: exampleDagFunction
reverseFuncname: exampleDagFunction
label: null
task-i:
approvalRequired: false
dependsOn:
- task-g
funcname: exampleDagFunction
reverseFuncname: exampleDagFunction
label: null
task-j:
approvalRequired: false
dependsOn:
- task-h
funcname: exampleDagFunction
reverseFuncname: exampleDagFunction
label: null
DAGワークフローの実行
作成したワークフローを実行します。
>>> dag = await DAG.load("helloDAG")↵
... instance = dag.createInstance()↵
... instance.run()↵
... ↵
↵
>>> instance.status↵
... ↵
↵
ApprovalPending
>>> instance.metadata↵
... ↵
↵
{'message': 'hello,', 'task-a': {'funcname': 'exampleDagFunction', 'delay': 0, 'message': 'hello, task-a', 'timestamp': '2022-12-13T14:26:47.105225+09:00', 'reverse': False}, 'task-b': {'funcname': 'exampleDagFunction', 'delay': 0, 'message': 'hello, task-b', 'timestamp': '2022-12-13T14:26:47.107193+09:00', 'reverse': False}, 'task-c': {'funcname': 'exampleDagFunction', 'delay': 0, 'message': 'hello, task-c', 'timestamp': '2022-12-13T14:26:47.123052+09:00', 'reverse': False}, 'task-d': {'funcname': 'exampleDagFunction', 'delay': 0, 'message': 'hello, task-d', 'timestamp': '2022-12-13T14:26:47.135570+09:00', 'reverse': False}, 'task-e': {'funcname': 'exampleDagFunction', 'delay': 0, 'message': 'hello, task-e', 'timestamp': '2022-12-13T14:26:47.152861+09:00', 'reverse': False}, 'task-f': {'funcname': 'exampleDagFunction', 'delay': 0, 'message': 'hello, task-f', 'timestamp': '2022-12-13T14:26:47.153428+09:00', 'reverse': False}, 'task-k': {'funcname': 'exampleDagFunction', 'delay': 0, 'message': 'hello, task-k', 'timestamp': '2022-12-13T14:26:47.180772+09:00', 'reverse': False}, 'task-l': {'funcname': 'exampleDagFunction', 'delay': 0, 'message': 'hello, task-l', 'timestamp': '2022-12-13T14:26:47.182448+09:00', 'reverse': False}, 'task-m': {'funcname': 'exampleDagFunction', 'delay': 0, 'message': 'hello, task-m', 'timestamp': '2022-12-13T14:26:47.197391+09:00', 'reverse': False}, 'task-n': {'funcname': 'exampleDagFunction', 'delay': 0, 'message': 'hello, task-n', 'timestamp': '2022-12-13T14:26:47.209028+09:00', 'reverse': False}}
>>>
ワークフローの承認待ちタスクがあるので承認して再開します。
>>> await instance.approval(["task-g"])↵
... instance.run()↵
... ↵
↵
>>> instance.status↵
... ↵
↵
Done
>>> instance.metadata↵
... ↵
↵
{'message': 'hello,', 'task-a': {'funcname': 'exampleDagFunction', 'delay': 0, 'message': 'hello, task-a', 'timestamp': '2022-12-13T14:26:47.105225+09:00', 'reverse': False}, 'task-b': {'funcname': 'exampleDagFunction', 'delay': 0, 'message': 'hello, task-b', 'timestamp': '2022-12-13T14:26:47.107193+09:00', 'reverse': False}, 'task-c': {'funcname': 'exampleDagFunction', 'delay': 0, 'message': 'hello, task-c', 'timestamp': '2022-12-13T14:26:47.123052+09:00', 'reverse': False}, 'task-d': {'funcname': 'exampleDagFunction', 'delay': 0, 'message': 'hello, task-d', 'timestamp': '2022-12-13T14:26:47.135570+09:00', 'reverse': False}, 'task-e': {'funcname': 'exampleDagFunction', 'delay': 0, 'message': 'hello, task-e', 'timestamp': '2022-12-13T14:26:47.152861+09:00', 'reverse': False}, 'task-f': {'funcname': 'exampleDagFunction', 'delay': 0, 'message': 'hello, task-f', 'timestamp': '2022-12-13T14:26:47.153428+09:00', 'reverse': False}, 'task-k': {'funcname': 'exampleDagFunction', 'delay': 0, 'message': 'hello, task-k', 'timestamp': '2022-12-13T14:26:47.180772+09:00', 'reverse': False}, 'task-l': {'funcname': 'exampleDagFunction', 'delay': 0, 'message': 'hello, task-l', 'timestamp': '2022-12-13T14:26:47.182448+09:00', 'reverse': False}, 'task-m': {'funcname': 'exampleDagFunction', 'delay': 0, 'message': 'hello, task-m', 'timestamp': '2022-12-13T14:26:47.197391+09:00', 'reverse': False}, 'task-n': {'funcname': 'exampleDagFunction', 'delay': 0, 'message': 'hello, task-n', 'timestamp': '2022-12-13T14:26:47.209028+09:00', 'reverse': False}, 'task-g': {'funcname': 'exampleDagFunction', 'delay': 0, 'message': 'hello, task-g', 'timestamp': '2022-12-13T14:29:12.148878+09:00', 'reverse': False}, 'task-h': {'funcname': 'exampleDagFunction', 'delay': 0, 'message': 'hello, task-h', 'timestamp': '2022-12-13T14:29:12.174069+09:00', 'reverse': False}, 'task-i': {'funcname': 'exampleDagFunction', 'delay': 0, 'message': 'hello, task-i', 'timestamp': '2022-12-13T14:29:12.174560+09:00', 'reverse': False}, 'task-j': {'funcname': 'exampleDagFunction', 'delay': 0, 'message': 'hello, task-j', 'timestamp': '2022-12-13T14:29:12.198975+09:00', 'reverse': False}}
>>>
ワークフローを切り戻します。
>>> instance.run(_reverse=True)↵
... ↵
↵
>>> instance.status↵
... ↵
↵
NotRunning
>>> instance.metadata↵
... ↵
↵
{'message': 'hello,', 'task-a': {'funcname': 'exampleDagFunction', 'delay': 0, 'message': 'hello, task-a', 'timestamp': '2022-12-13T14:30:25.011732+09:00', 'reverse': True}, 'task-b': {'funcname': 'exampleDagFunction', 'delay': 0, 'message': 'hello, task-b', 'timestamp': '2022-12-13T14:30:25.010712+09:00', 'reverse': True}, 'task-c': {'funcname': 'exampleDagFunction', 'delay': 0, 'message': 'hello, task-c', 'timestamp': '2022-12-13T14:30:25.028574+09:00', 'reverse': True}, 'task-d': {'funcname': 'exampleDagFunction', 'delay': 0, 'message': 'hello, task-d', 'timestamp': '2022-12-13T14:30:24.995897+09:00', 'reverse': True}, 'task-e': {'funcname': 'exampleDagFunction', 'delay': 0, 'message': 'hello, task-e', 'timestamp': '2022-12-13T14:30:24.985179+09:00', 'reverse': True}, 'task-f': {'funcname': 'exampleDagFunction', 'delay': 0, 'message': 'hello, task-f', 'timestamp': '2022-12-13T14:30:24.887524+09:00', 'reverse': True}, 'task-k': {'funcname': 'exampleDagFunction', 'delay': 0, 'message': 'hello, task-k', 'timestamp': '2022-12-13T14:30:24.967443+09:00', 'reverse': True}, 'task-l': {'funcname': 'exampleDagFunction', 'delay': 0, 'message': 'hello, task-l', 'timestamp': '2022-12-13T14:30:24.888207+09:00', 'reverse': True}, 'task-m': {'funcname': 'exampleDagFunction', 'delay': 0, 'message': 'hello, task-m', 'timestamp': '2022-12-13T14:30:24.948342+09:00', 'reverse': True}, 'task-n': {'funcname': 'exampleDagFunction', 'delay': 0, 'message': 'hello, task-n', 'timestamp': '2022-12-13T14:30:24.907596+09:00', 'reverse': True}, 'task-g': {'funcname': 'exampleDagFunction', 'delay': 0, 'message': 'hello, task-g', 'timestamp': '2022-12-13T14:30:24.968165+09:00', 'reverse': True}, 'task-h': {'funcname': 'exampleDagFunction', 'delay': 0, 'message': 'hello, task-h', 'timestamp': '2022-12-13T14:30:24.947750+09:00', 'reverse': True}, 'task-i': {'funcname': 'exampleDagFunction', 'delay': 0, 'message': 'hello, task-i', 'timestamp': '2022-12-13T14:30:24.926275+09:00', 'reverse': True}, 'task-j': {'funcname': 'exampleDagFunction', 'delay': 0, 'message': 'hello, task-j', 'timestamp': '2022-12-13T14:30:24.905412+09:00', 'reverse': True}}
>>>
task-mを失敗させて自動切り戻しします。
>>> i.run(failIdentifier="task-m", delay=1)↵
... await asyncio.sleep(1)↵
... while i.status != "NotRunning":↵
... i.status↵
... await asyncio.sleep(1)↵
... i.status↵
... ↵
Running
Running
Running
Running
Running
ReverseRunning
ReverseRunning
ReverseRunning
ReverseRunning
ReverseRunning
ReverseRunning
↵
NotRunning
>>>