DAG開発チュートリアル
はじめに
このチュートリアルでは、Qmonus SDKのScenarioサービスの機能であるDAG
の開発を体験します。
DAGを使用したユースケースの一つであるZTO(Zero Touch Operation)
を題材に取り上げ、実際にGoogle CloudのComputeEngineインスタンスに障害が起きた時の障害対応、復旧作業を行うデモシステムを開発します。
DAGとは?
DAG(Directed Acyclic Graph)
は、有向非巡回グラフ型のワークフローモデルです。
DAGは、処理タスク間の依存関係を定義したワークフローで以下のようなイメージになります。この例では、DAGフローが開始されるとtask-a
とtask-b
が並列実行され、双方が完了するとtask-c
が実行されます。task-c
が完了するとtask-d
とtask-e
が並列実行され、task-d
が完了するとtask-f
を実行して終了します。
API開発チュートリアルで実装したScenarioとの違いに関しては以下の2点になります。
- 有向非巡回グラフ型 であること
- Scenarioは定義されたToyblockが順番通りに処理されるが、DAGは複数のタスクを並列に処理させることができる
- APIでは直接呼び出せない
- ScenarioはAPI pathを定義し、API GatewayにRoutingを設定することで、APIから呼び出せます。
- DAGはAPIの窓口を持たないため、ScenarioやFunctionの中でSDKの組み込み関数から呼び出します。
ZTOとは?
本チュートリアルの題材となるZTOとはNTTドコモがサービスの保守(故障復旧)業務の完全自動化を目指す仕組みです。
現在、多くのシステムの保守業務は以下のように人手を介した保守業務が中心です。
イメージ図では一連の流れとして表現されていますが、実際の障害対応においては、複数の宛先への報告が必要となるほか、原因調査においても多角的な視点からの切り分けが求められます。
そのため、一つの業務であっても、実際には多くのタスクが人手によって処理されているのが現状です。
この業務を全て自動化するというのがZTOの目標となります。
ZTOの実現に向けて、Qmonus SDKのDAGが以下のように用いられています。
システムの障害点検で使用されているのはQmonus SDKのCLA
(Closed Loop Automation)という機能です。
CLAによって検知された障害に対してあらかじめ作成したDAGのワークフローを走らせることで、人手を介さず障害対応を実施できます。
原因調査など複数のタスクの並列処理や、復旧作業後にTicket更新、復旧報告を行うなどタスク間に依存関係がある処理を実行するのにDAGは適しています。
チュートリアルを進める上での準備(前提)
本チュートリアルではAPI開発チュートリアルで開発したSample CRUD APIを一部使用しているため、事前にAPI開発チュートリアルを実施してください。
本チュートリアルでは、Google Cloud の Compute Engine インスタンスに対する障害対応を題材としています。 そのため、以下の操作が可能であれば、より実践的な内容として取り組むことができます(※ただし、これらができなくてもチュートリアルの単体試験までは進行可能です)。
- Google CloudでComputeEngineの権限を持つサービスアカウントを作成している。
- Google CloudのCompute Engine APIを用いてCompute EngineインスタンスをCRUDすることができる。
- SlackのAppを作成し、WorkSpaceにインストールできる。
チュートリアルのシチュエーション
本チュートリアルでは、ZTOの考え方を取り入れた、障害対応自動化システムの開発を題材としたシナリオに基づき、デモシステムを構築します。
本シナリオには、以下の登場人物が存在します。
- ユーザー
- GCE上でVMインスタンスを利用しています。障害が発生するとシステム停止などの影響を受ける可能性があります。
- オペレーター
- ユーザーが使用している GCE の VM インスタンスを保守しています。障害発生時には、ユーザーへの報告や原因の切り分けなど、複数の手作業を伴う業務を行います。
- 読者(あなた)
- ZTOを実現するため、オペレーターの手作業を削減・自動化する保守支援システムの開発に取り組みます。
デモシステムの実装
ここから障害復旧システムの実装に進みます。
実装内容確認
本チュートリアルで実装する障害復旧システムのアーキテクチャは以下になります。
-
ATOM
SDK内でのVMインスタンスの管理にはAPI開発チュートリアルで作成したVmInstanceを使用します。
本チュートリアルでは、障害対応時のチケット管理を模擬するため、Ticket情報を管理する ATOM を追加で作成します。 -
Config
SDK内部からCompute EngineのAPIを実行するにはサービスアカウントキー情報をConfigに保管する必要があります。
こちらはAPI開発チュートリアルのものを使用するため実装は省略します。 -
API Gateway
API GWにRoutingを設定します。
Routing設定により、各ScenarioやDAGで実行されたAPIを、API GWがGoogle Cooudに対して適切にRoutingします。 -
Scenario
本チュートリアルでは以下2つのScenarioを実装します。- GCE(Google Compute Engine)のVMインスタンス情報を取得するScenario
- GCEのVMインスタンスの障害を検知し、DAGを呼び出して、障害対応を行うScenario
-
DAG
本チュートリアルでは以下のDAGを実装します- Scenarioからの呼び出しで、ユーザーや オペレーターへの対応及び、検知された障害への対応を行うDAG
障害検知から障害対応までのデモシステムの動作イメージ
障害検知
本チュートリアルで実装するデモシステムの障害検知から障害対応における動作イメージを説明します。
前提としてオペレータはGoogle Cloud上にVMインスタンスを作成していて、SDK内のATOM
にも対応するVMインスタンス情報が管理されています。
まずオペレーターはGCEのインスタンスに障害が発生していないか確認するため、検知Scenario
を実行します。(イメージ図の簡略化のためAPI GW
は省略しています。)
検知Scenario
はGCEからVMインスタンス情報を取得するScenario
を利用してVM情報を取得します。
また、ATOM
が保持している対象のVM情報を取得します。
ここで、GCEからVM情報の取得ができない場合や、GCEとSDK内のVM情報に誤差があった場合、障害として検知します。
本チュートリアルで扱う障害は以下3点になります。
- SDK内ではVM情報が存在するが、Compute Engine上には存在しない場合
- Compute Engineのシステム自体に障害が発生し、インスタンス情報が取得できない
GCE自体に障害がある場合、インスタンス情報を取得するScenarioはAPIのレスポンスを確認できず、検知Scenarioにタイムアウト処理によるレスポンスを返却する。
- GCEとSDK内のインスタンス情報(MachineType)が異なる
障害対応
障害を検知したScenario
は障害対応を行うためDAG
のインスタンスを作成し、復旧に向けてDAG
のワークフローを実行します。
ワークフローではオペレーターやユーザへの通知、Ticket作成、障害復旧などを行います。(詳細はDAGの実装に記載)
各障害への復旧(対応)作業は以下になります。
- SDK内ではVM情報が存在するが、Compute Engine上には存在しない場合
SDK内のインスタンス情報を参考に、CREATE API(API開発チュートリアルで作成済み)でGCE上にインスタンス作成
- Compute Engineのシステム自体に障害が発生し、インスタンス情報を取得できない
ATOMのインスタンス情報にGCEシステムの障害の旨を記録することで対応
- GCEとSDK内のインスタンス情報(MachineType)が異なる
SDK内のインスタンス情報を参考に、UPDATE API(API開発チュートリアルで作成済)でGCEのインスタンス情報を更新
上記が本チュートリアルで実装するデモシステムの動作イメージになります。
ATOMの実装
実装内容の確認
本チュートリアルでは以下の2つのATOM
を実装します。
- VmInstance(API開発チュートリアルで実装済)
- GCEのインスタンス情報を所持している。
- TicketInstance
- 本チュートリアルで実装。
- インスタンス障害検知時に、障害に関する情報を記録し管理するためのチケットを模擬したもの。
VmInstanceはすでにAPI開発チュートリアルで実装されているため、本チュートリアルではTicketInstanceを実装します。
ATOMの定義
クラスの作成
[Workflow Scenario as a Service]>[Class]>[Create New Class]
からクラスを作成します。
入力は以下を参考にしてください。
項番 | 項目 | 入力値 | 説明 |
---|---|---|---|
1 | category | SampleDag | ATOMが属する分類を指定してください。この情報はクラスエディタでの階層表示のためのタグとしてのみ使用されます。 |
2 | workspace | 未指定 | ワークスペースを設定します。 |
3 | name | TicketInstance | ATOMの名前を指定します(クラス名と同義)。ユニークである必要があります。 |
メタ情報の定義
persistence
、abstract
、api_generation
などのその他メタ情報にはdefault値が設定されています。今回はdefaultの設定とします。
項番 | 項目 | 入力値 | 説明 |
---|---|---|---|
1 | persistence | 選択(default) | 永続化モードを指定します。選択した場合、ATOM定義を保存するとクラス構造に対応するデータベーステーブルが自動的に生成され、データベースに対するCRUDの組込みメソッドが実装されます。 |
2 | abstract | 未選択(default) | 抽象クラスモードを指定します。 |
3 | api_generation | 選択(default) | API自動生成モードを指定します。API自動生成モードでは、ATOMをCRUDするためのRestful APIが自動的に生成されます。 |
identifier fieldの定義
[attributes]>[identifier]
に以下を入力します。
項番 | 項目 | 入力値 | 説明 |
---|---|---|---|
1 | title | ticketId | フィールド名を指定します |
2 | field_type | integer | フィールドの型を指定します。 |
3 | field_persistence | 選択(default) | フィールドの永続化モードを指定します。選択しない場合は本フィールドはデータベースカラムとして生成されません。 |
4 | field_immutable | 選択(default) | フィールドの値が不変かを指定します。選択した場合、本フィールドの値変更は許可されません。 |
5 | field_metadata* | {"POST": false} | フィールドに任意のメタ情報を定義します。メタ情報はdict型のデータを設定してください。 |
local fieldsの定義
以下を入力します。
項番 | 項目 | 入力値 | 説明 |
---|---|---|---|
1 | field_name | status | identifier field参照 |
2 | field_type | [String]を選択(default) | identifier field参照 |
3 | field_persistence | 選択(default) | identifier field参照 |
4 | field_nullable | 選択(default) | フィールドの値にNullを許容するかを指定します。 |
5 | field_immutable | 未選択(default) | identifier field参照 |
6 | field_unique | 未選択(default) | フィールドが複合ユニークインデックスの対象かを指定します。 |
7 | field_format* | '[a-zA-Z0-9]' | フィールドのフォーマットをjsonschemaもしくは正規表現で指定します。 |
8 | field_metadata* | {"POST": true, "PUT": true} | identifier field参照 |
同様に以下のlocal fields
も定義します。
- instanceName
- cause
- resolution
カウンタの作成
[Transaction as a Service]>[Transaction Settings]
の画面右上にある[+]
ボタンを押下すると、カウンタ作成画面が開きます。
以下の項目を入力してください。
項番 | 項目 | 入力値 | 説明 |
---|---|---|---|
1 | counter_type | Number | カウンタの種類を指定します。 Number、UUID、Inventoryの3つのタイプがあります。 |
2 | workspace | 未入力 | ワークスペースを設定します。 |
3 | counter_name | ticketId | カウンタを一意に識別する名前を指定します。 |
4 | counter_format | $ | カウンタから返される値の形式を指定します。値は$で表されます。 |
5 | min_num | 1 | カウンタの最小値を指定します。 |
6 | max_num | 9999 | カウンタの最大値を指定します。 |
7 | padding | 選択しない | ゼロパディングモードを指定します。 |
メソッド(instance_methods)の定義
以下を上記で定義したVmInstanceのinstance_methods
のmethod_body
に入力する。
async def initialize(self, *args, **kwargs):
if not self.ticketId:
self.ticketId = int(await Counter.allocate("ticketId"))
ATOMの確認
上記でATOM
の用意ができました。
Interactive shellから、以下の入力をしてみて動作を確認してみましょう。
# ticketInstanceの作成
>>> ticket_instance = await atom.TicketInstance()↵
... ↵
↵
>>> print(ticket_instance)↵
... ↵
↵
# ticketIdが割り当てられ、インスタンスが作成されていることを確認
TicketInstance(instance='VGlja2V0SW5zdGFuY2U6ZmY0NDMzMzY2M2FjMTFmMDkyYjhhZTRlZDk2Njg1ZTQ=', xid=None, xname=None, ticketId=37, status=None, instanceName=None, cause=None, resolution=None)
Routingの実装
実装内容の確認
本チュートリアルで実装するRoutingは以下です。
/SampleDag/{projectId}/{zone}/{instanceName}
→https://compute.googleapis.com/compute/v1/projects/{projectId}/zones/{zone}/instances/{instanceName}
このRoutingはGCEのインスタンス情報をGETする際に使用します。
Routingの設定
API開発チュートリアルのRouting設定を参考に以下のパラメータを設定する。
項番 | 項目 | 入力値 |
---|---|---|
1 | workspace | 未設定 |
2 | domain | default(デフォルト) |
3 | scheme(proxy) | http(デフォルト) |
4 | path(proxy) | /SampleDag/{projectId}/{zone}/{instanceName} |
5 | authority | compute.googleapis.com |
6 | scheme(target) | https |
7 | path(target) | /compute/v1/projects/{projectId}/zones/{zone}/instances/{instanceName} |
以上でRoutingの設定は終了です。
Scenarioの実装
実装内容確認
障害対応システムのScenario実装に進む前にSample CRUD APIについて確認し、再利用するものと新しく開発するものを確認をします。
Sample CRUD APIでは以下5つのScenarioを実装しました。
アクセストークン取得 : アクセストークンを取得する(本チュートリアルでも再利用)
CREATE : ComputeEngineのインスタンスを作成する(本チュートリアルでも再利用)
READ : SDKのATOMからComputeEngineインスタンスの情報を取得する
UPDATE : ComputeEngineのインスタンスのMachineTypeを更新する(本チュートリアルでも再利用)
DELETE : ComputeEngineのインスタンスを削除する
本チュートリアルでは以下の2つのScenarioを新たに実装します。
- Comupute EngineのVMインスタンス情報を取得するScenario
- ComputeEngineの障害を検知し、復旧のためのDAGを呼び出すScenario
- Compute Engineのインスタンス情報を取得
- 障害検知
- 障害対応するためのDAGを呼び出し、インスタンスを作成、ワークフローの実行
Scenarioの作成
Comupute EngineのVMインスタンス情報を取得するScenarioの作成
以下のように[Workflow Scenario as a Service]>[Scenario]>[Create New Scenario]
から新たなScenarioを作成します。
作成画面を開いた後、以下のScenario定義を設定し、Scenarioを作成します。
項番 | 項目 | 入力値 |
---|---|---|
1 | category | SampleDag |
2 | workspace | 未入力 |
3 | name | GetInstance |
4 | method | GET |
5 | uri | /sample_dag/instances/{gce_id} |
6 | transaction | 未選択 |
7 | routing_auto_generation_mode | 選択 |
次にscriptブロックの追加を行う。
- アクセストークンの取得
Display Label: get access token
Python code:
r = await callout(
# API開発チュートリアルで作成したアクセストークンの取得APIを使用
path="/sample_crud_api/get_access_token",
method="GET"
)
if r.error:
raise Error(r.code, reason="GET failed")
access_token = json.loads(r.body)["access_token"]
- SDK内のATOMインスタンスから対象のインスタンス情報を取得
Display Label: get instance from SDK
Python code:
gce_id = context.request.resources.gce_id
# 検知対象のインスタンス情報を取得する
get_instances = await atom.VmInstance.retrieve(gceId=gce_id)
get_instance = get_instances[0]
- Compute Engineからインスタンス情報を取得
Display Label: get instance from GCE
Python code:
r = await callout(
path=f"/SampleDag/{get_instance.projectId}/{get_instance.zone}/{get_instance.instanceName}",
method="GET",
headers={
"Authorization": f"Bearer {access_token}"
}
)
instance_info = json.loads(r.body.decode("utf-8"))
context.session.finish({"instance_info": instance_info})
次にVariables
に以下の変数を設定する
- get_instance
- access_token
動作確認
CREATE APIを利用してGCE上にインスタンスを作成してください。 その後、Interactive shellで以下のようにインスタンス情報を確認します。
# VmInstance情報を全て取得
>>> instance = await atom.VmInstance.retrieve()↵
... ↵
↵
>>> print(instance)↵
... ↵
↵
[VmInstance(instance='Vm1JbnN0YW5jZTozNGI2ZDVjNDg5NGExMWYwOWMyM2Q2MzdmZDljNjEzYw==', xid=None, xname=None, instanceId=19, instanceName='test-instance', projectId='************', zone='************', machineType='e2-medium', sourceImage=None, network=None, status='RUNNING', gceId='********')]
>>>
GCE上のインスタンスID(gceId)
が確認できたら、コピーする。
次にGetInstanceの「Try API Call」からurl resources
にコピーしたgceId
を入力して、APIを呼び出す。
レスポンスでインスタンス情報が取得できれば正常に動作しています
ComputeEngineの障害を検知し、障害対応のためのDAGを呼び出すScenarioの作成
作成画面を開いたのち、以下のScenario定義を設定し、Scenarioを作成します。
項番 | 項目 | 入力値 |
---|---|---|
1 | category | SampleDag |
2 | workspace | 未入力 |
3 | name | DetectSystemFailure |
4 | method | GET |
5 | uri | /sample_dag/instances/{gce_id}/detect |
6 | transaction | 未選択 |
7 | routing_auto_generation_mode | 選択 |
scriptブロックの追加
- SDK内のATOMインスタンスから対象のインスタンス情報を取得
Display Label: get instance from SDK
Python code:
gce_id = context.request.resources.gce_id
# 検知対象のインスタンス情報を取得する
get_instances = await atom.VmInstance.retrieve(gceId=gce_id)
get_instance = get_instances[0]
- Compute Engineからインスタンス情報を取得
Display Label: get instance from GCE
Python code:
# GCEのインスタンス情報を取得
r = await callout(
path=f"/sample_dag/instances/{get_instance.gceId}",
method="GET"
)
response = r
- 取得できたインスタンス情報から障害検知
Display Label: detect instance failure
Python code:
import os
# インスタンス情報取得時にerrorがあった場合は障害復旧
if response.code == 599:
cause = "Failure in GCE system"
resolution = "Change status of VmInstance"
else:
instance_info = json.loads(response.body.decode("utf-8"))["instance_info"]
if "error" in instance_info:
response_code = instance_info["error"]["code"]
# エラーコードレスポンスにより異常を検知し、workflowを実施
if response_code == 404:
cause = "Instance not found"
resolution = "Create the GCE instance"
else:
# インスタンスのマシンタイプを取得する
instance_machine_type = os.path.basename(instance_info["machineType"])
# ATOMとGCEのmachineType情報を比較
if get_instance.machineType != instance_machine_type:
cause = "A mismatch exists between the GCE instance and the SDK instance data. Details:machineType"
resolution = "Update machineType of GCE instance"
else:
cause = "null"
resolution = "null"
- 障害復旧のためのDAGのインスタンス作成、ワークフロー実施
Display Label: start dag workflow
Python code:
if cause != "null":
# DAGインスタンスの作成・ワークフロー開始
dag_Instance_Id = await context.createDagInstance(dagName="RecoveryWork", gce_id=gce_id, cause=cause, user_name="user", operator_name="operator", msg_type="error", resolution=resolution)
result = {
"cause": cause,
"dag_instance_id": dag_Instance_Id
}
context.session.finish({"detect_result": result})
else:
result = {
"cause": "null"
}
context.session.finish({"detect_result": result})
Caution
DAGインスタンスの作成・ワークフローの実行を行っている「createDagInstance」は Functionプラグインにて実装
Variablesの設定
以下の変数をVariables
に設定する。
- get_instance
- access_token
- gce_id
- resolution
- cause
- response
最後に本Scenarioのstart dag workflow
で使用されている Function
を実装します。
Function
は[Workflow Scenario as a Service]>[Bultin]>[Create New Function]
から新規作成する。
作成画面を開いたのち、以下の定義を設定し、新たなFunction
を作成します。
項目 | 入力値 |
---|---|
category | SampleDag |
workspace | 未入力 |
name | createDagInstance |
次にFuction
のスクリプトに以下を設定します。
async def createDagInstance(
dagName: str,
gce_id: str,
cause: str,
user_name: str,
operator_name: str,
msg_type: str,
resolution: str
):
# 対象のインスタンスを抽出
get_instances = await atom.VmInstance.retrieve(gceId=gce_id)
get_instance = get_instances[0]
# DAGインスタンスの作成・ワークフロー開始
dag = await DAG.load(dagName)
dag_instance = dag.createInstance()
dag_instance.run(
gce_id=gce_id,
instance_name=get_instance.instanceName,
cause=cause,
user_name=user_name,
operator_name=operator_name,
dag_instance_id=dag_instance.instance,
msg_type=msg_type,
resolution=resolution
)
return dag_instance.instance
Note
dag_instance.run()の引数について:
dag_instance.run()は生成されたDAGインスタンスのワークフローをスタートさせるためのメソッドです。
引数にはmetadata(詳細はDAGの実装に記載)を設定できます。
障害検知・対応のScenarioの動作確認はfaker
や障害の模擬が必要になるため、UnitTest
や結合試験の章で行います。
以上でScenarioの実装は終了です。
DAGの実装
実装内容確認
本チュートリアルで実装するDAGのワークフローは以下になります。
Scenarioによって障害が検知され、その際の障害の基本情報をmetadata
として受け取り、上記のワークフローが実行されます。
Note
metadataとは?:
DAGワークフローの全てのタスクが共用できるグローバル名前空間。
各Taskの実行時にmetadataに存在する変数が引数として自動的にバインドされます。
metadataは、DAG定義時、DAGInstance生成時、DAGInstance走行開始時に指定することが可能です。
各Taskでmetadataを辞書オブジェクトして返却すると、metadataにマージされ、後続のTaskでも利用が可能になります。
参考:プログラミングガイド>DAG>DAGのメタデータ
本チュートリアルでのDAGの役割としては以下の3つになります。
- User対応
- 障害・復旧通知
- Operator対応
- 障害・復旧通知、復旧作業確認、Ticket対応
- 障害対応
- 障害復旧
流れとしてはまず障害を検知した時点で、以下3つの処理を行います。
- Userへの障害通知
- Operatorへの障害および復旧作業確認通知
- 障害を管理するためのTicket作成
次にDAGの承認待ち機能
を活用して、復旧作業前にOperatorによる承認待ちを行います。
承認後、検知された障害に対応した復旧作業を実施し、復旧後にOperatorへ復旧内容と復旧確認のための通知をします。
通知を受け取ったOperatorは復旧が完了したかどうかを確認して、承認を行います。
承認後、Userへの復旧通知とTicketの更新が行われるという流れになります。
Note
承認待ち機能とは?:
DAGのTaskには承認待ち機能を持たせることができます。
承認待ち機能をもったTaskは利用者による承認行為がなされるまで、Pendingされます。
この承認待ち機能はZTOのユースケースでも利用されています。
最終的にはZTOによって全ての作業を自動化することが目標ですが、全てを自動化するにはステップが必要です。
例えば、大量のユーザを抱えたシステムにいきなりZTOシステムを導入した場合、障害復旧時に自動的に選択された復旧作業によってさらなる障害を招く可能性があります。
それらを防ぐため、この承認待ち機能が利用されます。
承認待ち機能によって、自動的に調査・選択された障害原因・対応が正常かどうか人の手で確認できます。
完全なZTOの実現の前段階として、承認待ち機能は有効です
本チュートリアルで対応する障害を改めて確認すると以下の3つになります。
障害内容 | 障害対応内容 |
---|---|
SDK内ではVM情報が存在するが、Compute Engine上には存在しない場合 | SDK内のインスタンス情報を参考に、CREATE API(API開発チュートリアルで作成済み)でGCE上にインスタンス作成 |
Compute Engineのシステム自体に障害が発生し、インスタンス情報が取得できない | ATOMのインスタンス情報にGCEに発生したシステム障害の旨を記録することで対応 |
GCEとSDK内のインスタンス情報(MachineType)が異なる | SDK内のインスタンス情報を参考に、UPDATE API(API開発チュートリアルで作成済)でGCEのインスタンス情報を更新 |
それではDAGの実装に進みます。
Slack Appの作成とWorkspaceへのインストール、チャンネルの作成
デモシステムに必要なSlack Appの作成とWorkspaceへのインストールを行います。
詳細な方法は各自で調査してください。
まずは障害や復旧の通知を行うためのSlack Appを作成しWorkspaceにインストールします。
権限としてはScopes>Bot Token Scopes
にchat:write
を設定します。
Workspaceにインストールできたらチャンネルの作成を行います。
今回使用するチャンネルはUser用とOperator用の2つのチャンネルを作成します。
チャンネルの作成後、チャンネルへAppを追加し、インストールしたAppをチャンネルに追加します。
本チュートリアルでは「dag_user」
と「dag_operator」
というチャンネルを使用します。
上記の名前以外のチャンネルを使用する際には、Functionに自身が作成したチャンネル名を記載する必要があります。
最後にSDK内部からSlackのAppを使用するために必要なOAuth Tokens
(xoxbから始まるもの)をメモします。
Functionプラグインの作成
DAGのワークフローを実装する前に、ワークフローのタスクに設定するFunctionプラグインを作成します。
Userへのslack通知用のFunctionの作成
まず[Workflow Scenario as a Service]>[Dag(Resources)]
に移動します。
次に以下の画像のようにPlugin
のCreate New Function
から新規Functionを作成します。
作成画面を開いたのち、以下の定義を設定し、新たなFunctionを作成します。
項目 | 入力値 |
---|---|
category | SampleDag |
workspace | 未入力 |
name | notificationToUser |
次にFuctionのスクリプトに以下を設定します。(必要に応じてOAuth token
を書き換えてください)
import os
async def notificationToUser(
__instance__: str,
__identifier__: str,
__reverse__: bool,
cause: str,
user_name: str,
instance_name: str,
msg_type: str
) -> dict:
# slack通知に必要なパラメータの設定
username=f"Qmonus {options.version} {os.uname()[1]}"
send_chan = "#dag_user" # チャンネル名
emoji=options.slack_notify_emoji
color="#7CD197"
token="xoxb-*****" # OAuth Token(自身が用意したTokenに書き換えてください)
pretext="SampleDag"
msg=f'To: {user_name}\n' + f'MsgType: {msg_type}\n' + f'InstanceName: {instance_name}\n' + f'cause: {cause}\n'
params=dict(
channel=send_chan,
username=username.encode("utf-8"),
icon_emoji=emoji,
attachments=[
dict(
fallback=username,
color=color,
pretext=pretext,
text=msg.encode("utf-8")
)
]
)
# slack通知
r=await callout(
url="https://slack.com/api/chat.postMessage",
headers={"Authorization": f"Bearer {token}", "Content-Type": "application/json"},
method=POST,
body=params
)
return {
__identifier__: {
"send_chan": send_chan,
"user_name": user_name,
"msg": msg
}
}
その他のFunctionの作成
同様に本チュートリアルで使用するFunctionを作成します。
以下の表を参考に7個のFunctionを作成してください。
category
とworkspace
はnotificationToUser
と同様のものを設定してください。
name: notificationToOperator
code:
import os
async def notificationToOperator(
__instance__: str,
__identifier__: str,
__reverse__: bool,
cause: str,
operator_name: str,
instance_name: str,
msg_type: str,
resolution: str
) -> dict:
# slack通知に必要なパラメータの設定
username=f"Qmonus {options.version} {os.uname()[1]}"
send_chan = "#dag_operator" # チャンネル名
emoji=options.slack_notify_emoji
color="#7CD197"
token="xoxb-*****" # OAuth Token(自身が用意したTokenに書き換えてください)
pretext="SampleDag"
msg = f'To: {operator_name}\n' + f'MsgType: {msg_type}\n' + f'InstanceName: {instance_name}\n' + f'cause: {cause}\n' + f'DagInstanceId: {__instance__}\n' + f'Resolution: {resolution}\n'
params=dict(
channel=send_chan,
username=username.encode("utf-8"),
icon_emoji=emoji,
attachments=[
dict(
fallback=username,
color=color,
pretext=pretext,
text=msg.encode("utf-8")
)
]
)
# slack通知(tokenには自身が作成したOAuth Tokensを設定)
r = await callout(
url="https://slack.com/api/chat.postMessage",
headers={"Authorization": f"Bearer {token}", "Content-Type": "application/json"},
method=POST,
body=params
)
return {
__identifier__: {
"send_chan": send_chan,
"operator_name": operator_name,
"msg": msg
}
}
name: createTicketInstance
code:
async def createTicketInstance(
__instance__: str,
__identifier__: str,
__reverse__: bool,
cause: str,
instance_name: str,
resolution: str
) -> dict:
# TicketInstanceの作成
ticket_instance = await atom.TicketInstance(status = "New", cause = cause, instanceName = instance_name, resolution = resolution)
await ticket_instance.save()
return {
__identifier__: {
"ticket": ticket_instance
},
"ticket_id": ticket_instance.ticketId
}
name: awaitApproval
code:
def awaitApproval(
__instance__: str,
__identifier__: str,
__reverse__: bool,
name: str
) -> dict:
# 承認者の名前をmetadataに追加
return {
__identifier__: {
"Approved person's name": name
}
}
name: createInstance
code:
async def createInstance(
__instance__: str,
__identifier__: str,
__reverse__: bool,
cause: str,
instance_name: str,
gce_id: str
) -> dict:
# SDK内のインスタンスを取得
target_instances = await atom.VmInstance.retrieve(gceId=gce_id)
target_instance = target_instances[0]
# GCEにインスタンスが存在しない障害か確認
if cause == "Instance not found":
# 削除されている場合、対象のインスタンスを作成
r = await Scenario.run("InstanceCreate", method=POST, headers={"Content-Type": "application/json"}, instance_name=target_instance.instanceName)
CHECK_INTERVAL = 1
TIMEOUT = 360
# インスタンス作成のScenarioの処理が完了したかをポーリング
for i in range (TIMEOUT):
get_instance = await Scenario.run("GetInstance", method=GET, headers={"Content-Type": "application/json"}, gce_id=gce_id)
instance_info=json.loads(get_instance.body.decode("utf-8"))["instance_info"]
if not ("error" in instance_info):
break
await asyncio.sleep(CHECK_INTERVAL)
# ATOMのインスタンスのgceIdを更新
target_instance.gceId = instance_info["id"]
await target_instance.save()
return {
__identifier__: {
"instance_info": instance_info
},
"msg_type" : "recovery"
}
else:
return{
__identifier__: "not worked"
}
name: updateMachineType
code:
import os
async def updateMachineType(
__instance__: str,
__identifier__: str,
__reverse__: bool,
cause: str,
instance_name: str,
gce_id: str
) -> dict:
if cause == "A mismatch exists between the GCE instance and the SDK instance data. Details:machineType":
# SDK内のインスタンスを取得
target_instances = await atom.VmInstance.retrieve(gceId=gce_id)
target_instance = target_instances[0]
# SDKとGCEのインスタンス情報でどこのfieldが異なるのか"cause"から確認
target_field = cause.split(':')[1]
if target_field == "machineType":
# SDKのインスタンス情報をもとにGCEインスタンスをアップデート
r = await Scenario.run("InstanceUpdate", method=PUT, headers={"Content-Type": "application/json"}, instance_name=target_instance.instanceName, machine_type=target_instance.machineType)
CHECK_INTERVAL = 5
TIMEOUT = 120
# インスタンス作成のScenarioの処理が完了したかをポーリング
for i in range (TIMEOUT):
get_instance = await Scenario.run("GetInstance", method=GET, headers={"Content-Type": "application/json"}, gce_id=gce_id)
instance_info=json.loads(get_instance.body.decode("utf-8"))["instance_info"]
instance_machine_type = os.path.basename(instance_info["machineType"])
if instance_machine_type == target_instance.machineType:
break
await asyncio.sleep(CHECK_INTERVAL)
return {
__identifier__: {
"instance_info": target_instance
},
"msg_type" : "recovery"
}
else:
return{
__identifier__: "not worked"
}
name: updateATOMInstance
code:
async def updateATOMInstance(
__instance__: str,
__identifier__: str,
__reverse__: bool,
cause: str,
instance_name: str,
gce_id: str
) -> dict:
if cause == "Failure in GCE system":
# SDKのインスタンスを取得
target_instances = await atom.VmInstance.retrieve(gceId=gce_id)
target_instance = target_instances[0]
# インスタンスのstatus情報を設定し直す
target_instance.status = "GCE SYSTEM ERROR"
await target_instance.save()
return {
__identifier__: {
"instance_info": target_instance
},
"msg_type" : "recovery"
}
else:
return{
__identifier__: "not worked"
}
name: updateTicketInstance
code:
async def updateTicketInstance(
__instance__: str,
__identifier__: str,
__reverse__: bool,
ticket_id: int,
) -> dict:
# 対象のTicketInstanceの取り出し、statusをResolveに更新
ticketInstances = await atom.TicketInstance.retrieve(ticketId=ticket_id)
ticketInstance = ticketInstances[0]
ticketInstance.status = "Resolve"
await ticketInstance.save()
return {
__identifier__: {
"update_ticket": ticketInstance
}
}
以上でDAGのワークフローに必要なfunctionの作成が終わりました。
ワークフローの作成
ここからDAGのワークフローの実装に入ります。
本チュートリアルで実装するワークフローは以下になります(再掲)
まずは[Workflow Scenario as a Service]>[Dag(Resources)]
に移動します。
以下の画面のようにCreate DAG
をクリックします。
作成画面を開いたのち、以下の定義を設定し、DAGを作成します。
項目 | 入力値 |
---|---|
category | SampleDag |
workspace | 未入力 |
name | RecoveryWork |
autoReverseOnFailure | 未選択 |
以下の画像のように新しく作成したDAGが左側の一覧に追加されます。
それではDAGへタスクの追加を行なっていきます。
左側のDAG一覧からRecoveryWork
をクリックし、中央の画面をRecoveryWork
のワークフロー作成画面に遷移させる。
ワークフローの作成画面をクリックすると以下のようにCreate New Task
のボタンが生成されるので、クリックし作成画面に遷移する。
作成画面を開いたのち、以下の定義を設定し、タスクを作成します。
- Userへの障害通知
項目 | 入力値 | 詳細 |
---|---|---|
identifier | task_00(デフォルト) | DAGタスクの識別子です。 |
label | Userへの障害通知 | 当該タスクの注釈や処理概要を記載できるユーザラベルです。 |
operator | FunctionPluginOperator(デフォルト) | DAGタスクで実行する処理を提供するオペレータを定義します。 1. FunctionPluginOperator: 特定のFunctionプラグインへの参照 2. ModulePluginOperator: 特定のModuleプラグインに含まれる特定の関数への参照 3. ThirdPartyPluginOperator: 特定の持ち込みpythonパッケージ内の特定モジュールに含まれる特定の関数への参照 4. DagPluginOperator: 特定のDAGプラグインへの参照 |
funcname | notificationToUser | DAGフローがdownstream方向に稼働している場合に当該タスクが実行する関数を指定します。 |
approvalRequired | 未選択(デフォルト) | 当該タスクの実行に際してユーザからの承認行為が必要か否かを指定するフラグです。デフォルトはFalseです。 |
idempotency | 未選択(デフォルト) | 当該タスクの冪等性の有無を宣言するフラグです。デフォルトはFalseです。 |
dependsOn | 未選択(デフォルト) | DAGタスクの依存先タスクの識別子のリストを指定します。 |
タスクを作成すると以下のような画面になり、ワークフローの作成画面にタスクが追加されます。
同様に以下2つのタスクを追加します。
- Operatorへの障害通知
項目 | 入力値 |
---|---|
identifier | task_01(デフォルト) |
label | Operatorへの障害通知と復旧作業確認 |
operator | FunctionPluginOperator(デフォルト) |
funcname | notificationToOperator |
approvalRequired | 未選択(デフォルト) |
idempotency | 未選択(デフォルト) |
dependsOn | 未選択(デフォルト) |
- 障害管理のためのTicket作成
項目 | 入力値 |
---|---|
identifier | task_02(デフォルト) |
label | Ticket作成 |
operator | FunctionPluginOperator(デフォルト) |
funcname | createTicketInstance |
approvalRequired | 未選択(デフォルト) |
idempotency | 未選択(デフォルト) |
dependsOn | 未選択(デフォルト) |
上記3つのタスクを作成した後のワークフロー作成画面は以下のようになります。
これら3つのタスクには依存関係がないので、障害発生時に並列で行われます。
- Operatorによる復旧承認待ち処理
本チュートリアルのDAGでは障害の発生と復旧作業の通知を受けたOperatorが復旧作業を開始するための承認を行います
その承認待ちを行うタスクを以下の定義から作成します
項目 | 入力値 |
---|---|
identifier | task_03(デフォルト) |
label | Operatorの承認待ち |
operator | FunctionPluginOperator(デフォルト) |
funcname | awaitApproval |
approvalRequired | 選択 |
idempotency | 未選択(デフォルト) |
dependsOn | task_00, task_01, task_02 |
上記のタスクを作成すると、ワークフローの全体像は以下のようになります。
dependsOn
のパラメータにタスクを設定したことで、タスク同士の依存関係が存在するワークフローになりました。
また、本タスクはapprovalRequired
を選択し、承認が必要なタスクのため、左上に承認待ちタスクのマークがついていることが確認できます。
次に対象の3つの障害に対する復旧作業のタスクを作成します。
- インスタンスの作成
本タスクの対象障害は「Compute Engineに指定のVMインスタンスが存在しない」場合となります。
以下の定義からタスクを作成します。
項目 | 入力値 |
---|---|
identifier | task_04(デフォルト) |
label | 障害対応_インスタンスの作成 |
operator | FunctionPluginOperator(デフォルト) |
funcname | createInstance |
approvalRequired | 未選択(デフォルト) |
idempotency | 未選択(デフォルト) |
dependsOn | task_03 |
- ATOMインスタンスのstatus更新
本タスクの対象障害は「Compute Engineのシステム自体に障害が発生し、VMインスタンス情報が取得できない」場合となります。
以下の定義からタスクを作成します。
項目 | 入力値 |
---|---|
identifier | task_05(デフォルト) |
label | 障害対応_ATOMインスタンスの更新 |
operator | FunctionPluginOperator(デフォルト) |
funcname | updateATOMInstance |
approvalRequired | 未選択(デフォルト) |
idempotency | 未選択(デフォルト) |
dependsOn | task_03 |
- GCEインスタンスのmachineType更新
本タスクの対象障害は「Compute Engineから取得したVMインスタンスのmachineTypeと、対応するATOMインスタンスのmachineTypeが異なる」場合となります。
以下の定義からタスクを作成します。
項目 | 入力値 |
---|---|
identifier | task_06(デフォルト) |
label | 障害対応_GCEインスタンスのmachineTypeの更新 |
operator | FunctionPluginOperator(デフォルト) |
funcname | updateMachineType |
approvalRequired | 未選択(デフォルト) |
idempotency | 未選択(デフォルト) |
dependsOn | task_03 |
上記3つの障害復旧作業のタスクを作成すると、ワークフローの全体像は以下のようになります。
3つの復旧作業がOperatorの承認後に実施されることがわかります。
- Operatorへの復旧確認通知
復旧作業が実施された後にOperatorへ復旧内容の確認を行うため通知を行うタスクを作成します。
以下の定義からタスクを作成します。
項目 | 入力値 |
---|---|
identifier | task_07(デフォルト) |
label | Operatorへの復旧内容通知 |
operator | FunctionPluginOperator(デフォルト) |
funcname | notificationToOperator |
approvalRequired | 未選択(デフォルト) |
idempotency | 未選択(デフォルト) |
dependsOn | task_04, task_05, task_06 |
上記のタスクを作成すると、ワークフローの全体像は以下のようになります。
3つの復旧作業とOperatorへの通知のタスクに依存関係があることがわかります。
- Operatorによる復旧完了承認待ち
復旧作業が完了した後に、復旧作業が完了したかを確認するためOperatorによる承認待ちのタスクを以下の定義より作成します。
項目 | 入力値 |
---|---|
identifier | task_08(デフォルト) |
label | Operatorによる承認待ち |
operator | FunctionPluginOperator(デフォルト) |
funcname | awaitApproval |
approvalRequired | 選択 |
idempotency | 未選択(デフォルト) |
dependsOn | task_07 |
上記のタスクを作成すると、ワークフローの全体像は以下のようになります。
Operatorによる復旧内容の承認がされた後、システムの復旧をUserへ通知し、障害検知時に作成されたTicketのstatus
を更新します。
- Userへのシステム復旧通知
以下の定義よりタスクを作成します。
項目 | 入力値 |
---|---|
identifier | task_09(デフォルト) |
label | Userへのシステム復旧通知 |
operator | FunctionPluginOperator(デフォルト) |
funcname | notificationToUser |
approvalRequired | 未選択(デフォルト) |
idempotency | 未選択(デフォルト) |
dependsOn | task_08 |
- Ticketの更新
以下の定義よりタスクを作成します。
項目 | 入力値 |
---|---|
identifier | task_10(デフォルト) |
label | Ticketの更新 |
operator | FunctionPluginOperator(デフォルト) |
funcname | updateTicketInstance |
approvalRequired | 未選択(デフォルト) |
idempotency | 未選択(デフォルト) |
dependsOn | task_08 |
以上2つのタスクを作成すると、ワークフローの全体像は以下のようになります。
以上でDAGのワークフローを作成することができました。
Testの実装
Test内容確認
ここまでで、ScenarioとDAGのワークフローを実装しました。
それらのシステムが正常に動作するのか、単体試験と結合試験を行います。
これから実装するTestの概要は以下になります。
単体試験
単体試験はAPI開発チュートリアル同様にUnitTest
によって行います。
UnitTest
の実装手順は以下になります。
Faker
の作成- テスト実行時に、外部サービスとの実際の通信を行わずに、あらかじめ定義した疑似動作(フェイク関数)を実行して応答を模擬する仕組みです。
Illusion
の作成- Fakerで定義されたフェイク関数群を、テストシーンに適用するための設定・割り当ての仕組みです。これにより、特定のテスト実行時に外部環境の動作を模擬できます。
TestCase
の作成- 一つのテストシナリオを構成する単位で、入力条件や期待する出力、検証(assert)処理などを記述し、個別の機能やプラグインの動作をテストします
上記のUnitTest
をScenarioとDAGに実装することによって、各障害ケースにてScnarioとDAGが正常な処理ができるか確認します。
結合試験(実施自体には前提条件が必要だが、目を通すことを推奨)
結合試験では実際に障害を発生させ、Scenarioによる障害検知からDAGによる障害復旧までの一連の処理が正常に動作するかを確認します。
結合試験ではFakerやIllusionを使用しないため、以下ができることが前提条件となります。
- Google CloudでComputeEngineの権限を持つサービスアカウントを作成している。
- Google CloudのCompute Engine APIを用いてCompute EngineインスタンスをCRUDすることができる。
- SlackのAppを作成し、WorkSpaceにインストールできる。
Caution
結合試験を実施する上での前提条件とはなりますが、DAGのワークフロー、taskの遷移の様子を確認できるので目を通すことをお勧めします。
Fakerの実装
実装内容確認
本チュートリアルでは、以下のFaker
を用意します。
faker名 | 想定する用途 |
---|---|
getAccessToken | (API開発チュートリアルで実装済み)疑似的にGETしてGoogle Cloud APIのアクセストークンを取得した際の挙動をする |
getInstanceFromGCE | GCEのVMインスタンス情報を取得するAPIを模擬する |
getInstanceByScenario | GCEのVMインスタンス情報を取得するScenarioを模擬する |
createDagInstance | DAGインスタンスを作成するfunctionを模擬する |
postSlack | slackに対して通知を行う処理を模擬する |
recoveryCreateInstance | 障害の復旧作業として、GCEのVMインスタンス作成を模擬する |
recoveryUpdateMachineType | 障害の復旧作業として、GCEのVMインスタンスのMachineTypeの更新を模擬する |
Fakerの作成
それではFakerを一つ一つ作成していきましょう。
Unit Test as a Service>Faker > Create New Faker
からFakerを作成することができます。
Fakerの作成方法の詳細はAPI開発チュートリアルを参考にしてください。
本チュートリアルでは以下のFakerを作成してください。
- faker①
name: getAccessToken (API開発チュートリアルで実装済みのため追加実装不要) category: SampleCrudApi
fakes:
async def Success(*args, **kwargs):
return FakeHttpResponse(
200,
body=json.dumps({"access_token": "xxxxx"}).encode("utf-8")
)
async def InternalError(*args, **kwargs):
return FakeHttpResponse(500)
async def NotFound(*args, **kwargs):
return FakeHttpResponse(404)
- faker②
name: getInstanceFromGCE
category: SampleDag
fakes:
async def Success(*args, **kwargs):
res_body = {
"id": "111"
}
return FakeHttpResponse(
200,
body=json.dumps(res_body).encode("utf-8")
)
- faker③
name: getInstanceByScenario
category: SampleDag
fakes:
async def Success(*args, **kwargs):
res_body = {
"instance_info": {
"id": "222",
"machineType": "https://www.googleapis.com/compute/v1/projects/<project_id>/zones/<zone>/machineTypes/e2-medium"
}
}
return FakeHttpResponse(
200,
body=json.dumps(res_body).encode("utf-8")
)
async def InstanceNotFound(*args, **kwargs):
res_body = {
"instance_info": {
"error": {
"code": 404
}
}
}
return FakeHttpResponse(
200,
body=json.dumps(res_body).encode("utf-8")
)
async def GCEFailure(*args, **kwargs):
return FakeHttpResponse(
599
)
async def MachineTypeMismatch(*args, **kwargs):
res_body = {
"instance_info": {
"machineType": "https://www.googleapis.com/compute/v1/projects/<project_id>/zones/<zone>/machineTypes/e2-small"
}
}
return FakeHttpResponse(
200,
body=json.dumps(res_body).encode("utf-8")
)
- faker④
name: createDagInstance
category: SampleDag
fakes:
async def Success(*args, **kwargs):
return "111"
- faker⑤
name: postSlack
category: SampleDag
fakes:
async def Success(*args, **kwargs):
return FakeHttpResponse(200)
- faker⑥
name: recoveryCreateInstance
category: SampleDag
fakes:
async def Success(*args, **kwargs):
return FakeHttpResponse(200)
- faker⑦
name: recoveryUpdateMachineType
category: SampleDag
fakes:
async def Success(*args, **kwargs):
return FakeHttpResponse(200)
Fakerの挿入
今回用意したFakerの挿入箇所をまとめておきます。
以下を参考に、それぞれのScenarioに対してFakerを挿入してください。
Scenario: GetInstance
- Toyblock①: get access token
faker: getAccessToken
挿入するcallout関数:
faker("getAccessToken")
r = await callout(
# API開発チュートリアルで作成したアクセストークンの取得APIを使用
path="/sample_crud_api/get_access_token",
method="GET"
)
- Toyblock②: get instance from GCE
faker: getInstanceFromGCE
挿入するcallout関数:
faker("getInstanceFromGCE")
r = await callout(
path=f"/SampleDag/{get_instance.projectId}/{get_instance.zone}/{get_instance.instanceName}",
method="GET",
headers={
"Authorization": f"Bearer {access_token}"
}
)
Scenario: DetectSystemFailure
- Toyblock①: get instance from GCE
faker: getInstanceByScenario
挿入するcallout関数:
# GCEのインスタンス情報を取得
faker("getInstanceByScenario")
r = await callout(
path=f"/sample_dag/instances/{get_instance.gceId}",
method="GET"
)
response = r
- Toyblock②: start dag workflow
faker: createDagInstance
挿入するcallout関数:
# DAGインスタンスの作成・ワークフロー開始
faker("createDagInstance")
dag_Instance_Id = await context.createDagInstance(dagName="RecoveryWork", gce_id=gce_id, cause=cause, user_name="user", operator_name="operator", msg_type="error", resolution=resolution)
以上でScenarioへの挿入は完了です。
次に以下を参考にして、それぞれのFunctionに対してFakerを挿入してください。
Function①: notificationToUser
- faker: postSlack
挿入するcallout関数:
faker("postSlack")
r = await callout(
url="https://slack.com/api/chat.postMessage",
headers={"Authorization": f"Bearer {token}", "Content-Type": "application/json"},
method=POST,
body=params
)
Function②: notificationToOperator
- faker: postSlack
挿入するcallout関数:
# slack通知(tokenには自身が作成したOAuth Tokensを設定)
faker("postSlack")
r = await callout(
url="https://slack.com/api/chat.postMessage",
headers={"Authorization": f"Bearer {token}", "Content-Type": "application/json"},
method=POST,
body=params
)
Function③: createInstance
- faker: recoveryCreateInstance
挿入するcallout関数:
faker("recoveryCreateInstance")
r = await Scenario.run("InstanceCreate", method=POST, headers={"Content-Type": "application/json"}, instance_name=target_instance.instanceName)
- faker: getInstanceByScenario
挿入するcallout関数:
faker("getInstanceByScenario")
get_instance = await Scenario.run("GetInstance", method=GET, headers={"Content-Type": "application/json"}, gce_id=gce_id)
Function④: updateMachineType
- faker: recoveryUpdateMachineType
挿入するcallout関数:
faker("recoveryUpdateMachineType")
r = await Scenario.run("InstanceUpdate", method=PUT, headers={"Content-Type": "application/json"}, instance_name=target_instance.instanceName, machine_type=target_instance.machineType)
- faker: getInstanceByScenario
挿入するcallout関数:
faker("getInstanceByScenario")
get_instance = await Scenario.run("GetInstance", method=GET, headers={"Content-Type": "application/json"}, gce_id=gce_id)
以上でFakerの実装が完了です。
Illusionの実装
実装内容確認
本チュートリアルでは、以下のIllusion
を用意します。
Illusion名 | 想定する場合 |
---|---|
GetInstanceSuccess | ScenarioによるGCEのインスタンス情報の取得が成功する場合 |
DetectNoSystemFailure | Scenarioによる障害検知で障害が検知されない場合 |
DetectInstanceNotFound | Scenarioによる障害検知でGCEに指定のインスタンスが存在しない障害を検知した場合 |
DetectGCEFailure | Scenarioによる障害検知でGCEのシステム自体の障害を検知した場合 |
DetectMachineTypeMismatch | Scenarioによる障害検知でGCEとSDKでMachineTypeの情報が一致しない障害を検知した場合 |
RecoveryInstanceNotFound | DAGによるGCEに指定のインスタンスが存在しない障害へ復旧対応する場合 |
RecoveryGCEFailure | DAGによるGCEのシステム自体の障害へ復旧対応する場合 |
RecoveryMachineTypeMismatch | DAGによるGCEとSDKでMachineTypeの情報が一致しない障害へ復旧対応する場合 |
Illusionの作成
それではIllusionを一つ一つ作成していきましょう。
[Unit Test as a Service]>[Illusion]>[Create New Illusion]
からIllusionを新規作成することができます。
Illusionの詳細な作成方法はAPI開発チュートリアルを参考にしてください
本チュートリアルでは以下のIllusionを作成してください。
name | category | fakes |
---|---|---|
GetInstanceSuccess | SampleDag | getAccessToken - Success getInstanceFromGCE - Success |
DetectNoSystemFailure | SampleDag | getInstanceByScenario - Success |
DetectInstanceNotFound | SampleDag | getInstanceByScenario - InstanceNotFound createDagInstance - Success |
DetectGCEFailure | SampleDag | getInstanceByScenario - GCEFailure createDagInstance - Success |
DetectMachineTypeMismatch | SampleDag | getInstanceByScenario - MachineTypeMismatch createDagInstance - Success |
RecoveryInstanceNotFound | SampleDag | postSlack - Success recoveryCreateInstance - Success getInstanceByScenario - Success |
RecoveryGCEFailure | SampleDag | postSlack - Success |
RecoveryMachineTypeMismatch | SampleDag | postSlack - Success getInstanceByScenario - Success recoveryUpdateMachineType - Success |
以上でIllusionの実装が完了です。
Testcaseの実装
実装内容確認
本チュートリアルでは、以下のTestcaseを用意します。
Test Case | 対象Service | 適用するIllusion | 想定するケース |
---|---|---|---|
GetInstanceSuccess | Scenario | GetInstanceSuccess | GCEへのインスタンス情報取得APIが成功するケース |
DetectNoSystemFailure | Scenario | DetectNoSystemFailure | 障害検知APIで障害が検知されないケース |
DetectInstanceNotFound | Scenario | DetectInstanceNotFound | 障害検知APIでGCE上にVMインスタンスが確認できない障害を検知するケース |
DetectGCEFailure | Scenario | DetectGCEFailure | 障害検知APIでGCEのシステム自体の障害を検知するケース |
DetectMachineTypeMismatch | Scenario | DetectMachineTypeMismatch | 障害検知APIでGCEとSDKでMachineTypeの情報が一致しない障害を検知するケース |
RecoveryInstanceNotFound | DAG | RecoveryInstanceNotFound | DAGによるGCEに指定のインスタンスが存在しない障害へ復旧対応する場合 |
RecoveryGCEFailure | DAG | RecoveryGCEFailure | DAGによるGCEのシステム自体の障害へ復旧対応する場合 |
RecoveryMachineTypeMismatch | DAG | RecoveryMachineTypeMismatch | DAGによるGCEとSDKでMachineTypeの情報が一致しない障害へ復旧対応する場合 |
Test Caseの作成
それでは一つずつTestcaseを作成していきます。
[Unit Test as a Service]>[Test Case]>[Create New Testcase]
からTestcaseを作成することができます。
ScenarioとDAGではTestcaseの作成方法が異なります。
Scenarioに対するTestcaseの詳細な作成方法はAPI開発チュートリアルを参考にしてください。
まずはScenario
に対するTestcaseを以下の定義を参考に5つ作成してください。
Testcase①
Test Target Type: Scenario
name: GetInstanceSuccess
category: SampleDag
target: GetInstance
illusion: GetInstanceSuccess
Preparation:
async def preparation():
# テスト用のVmInstanceクラスのインスタンスを作成する
test_instance = await atom.VmInstance(
instanceName="test-instance",
gceId="111",
machineType="e2-medium"
)
# データベースに保存する
await test_instance.save()
Assert Begin:
async def hoge(*args, **kwargs):
pass
Scenario Input
- method: GET
- path: /sample_dag/instances/111
- headers:
{
"Content-Type": "application/json"
}
Scenario Input(script)
- get access token: -
- get instance from SDK: -
- get instance from GCE: -
Scenario Output:
async def assertion():
assert Response.code == 200, "Invalid response code %r" % Response.code
Scenario Output(script)
- get access token: -
- get instance from SDK: -
- get instance from GCE: -
Assert End:
async def assertion():
gce_id = json.loads(Response.body.decode("utf-8"))["instance_info"]["id"]
# fakerからインスタンスIDの情報を取得できているか確認
assert gce_id == "111", "%r" % Response.body.decode("utf-8")
Cleanup:
async def cleanup():
# テスト用のInstanceを削除する
test_instances = await atom.VmInstance.retrieve(gceId="111")
test_instance = test_instances[0]
await test_instance.destroy()
# テスト用のInstanceの削除を確認する
test_instances = await atom.VmInstance.retrieve(gceId="111")
assert test_instances == [], "Test instance exists"
Testcase②
Test Target Type: Scenario
name: DetectNoSystemFailure
category: SampleDag
target: DetectSystemFailure
illusion: DetectNoSystemFailure
Preparation:
async def preparation():
# テスト用のVmInstanceクラスのインスタンスを作成する
test_instance = await atom.VmInstance(
instanceName="test-instance",
gceId="111",
machineType="e2-medium"
)
# データベースに保存する
await test_instance.save()
Assert Begin:
async def hoge(*args, **kwargs):
pass
Scenario Input
- method: GET
- path: /sample_dag/instances/111/detect
- headers:
{
"Content-Type": "application/json"
}
Scenario Input(script)
- get instance from SDK: -
- get instance from GCE: -
- detect failure: -
- start dag workflow: -
Scenario Output:
async def assertion():
assert Response.code == 200, "Invalid response code %r" % Response.code
Scenario Output(script)
- get instance from SDK: -
- get instance from GCE: -
- detect failure: -
- start dag workflow: -
Assert End:
async def assertion():
cause = json.loads(Response.body.decode("utf-8"))["detect_result"]["cause"]
# causeに障害が検知出来ていない旨(null)が設定されているか確認
assert cause == "null", "%r" % Response.body.decode("utf-8")
Cleanup:
async def cleanup():
# テスト用のInstanceを削除する
test_instances = await atom.VmInstance.retrieve(gceId="111")
test_instance = test_instances[0]
await test_instance.destroy()
# テスト用のInstanceの削除を確認する
test_instances = await atom.VmInstance.retrieve(gceId="111")
assert test_instances == [], "Test instance exists"
Testcase③
Test Target Type: Scenario
name: DetectInstanceNotFound
category: SampleDag
target: DetectSystemFailure
illusion: DetectInstanceNotFound
Preparation:
async def preparation():
# テスト用のVmInstanceクラスのインスタンスを作成する
test_instance = await atom.VmInstance(
instanceName="test-instance",
gceId="111",
machineType="e2-medium"
)
# データベースに保存する
await test_instance.save()
Assert Begin:
async def hoge(*args, **kwargs):
pass
Scenario Input
- method: GET
- path: /sample_dag/instances/111/detect
- headers:
{
"Content-Type": "application/json"
}
Scenario Input(script)
- get instance from SDK: -
- get instance from GCE: -
- detect failure: -
- start dag workflow: -
Scenario Output:
async def assertion():
assert Response.code == 200, "Invalid response code %r" % Response.code
Scenario Output(script)
- get instance from SDK: -
- get instance from GCE: -
- detect failure: -
- start dag workflow: -
Assert End:
async def assertion():
cause = json.loads(Response.body.decode("utf-8"))["detect_result"]["cause"]
dag_instance_id = json.loads(Response.body.decode("utf-8"))["detect_result"]["dag_instance_id"]
# 障害理由と作成されたDAGのinstanceIdを確認
assert cause == "Instance not found", "%r" % Response.body.decode("utf-8")
assert dag_instance_id == "111", "%r" % Response.body.decode("utf-8")
Cleanup:
async def cleanup():
# テスト用のInstanceを削除する
test_instances = await atom.VmInstance.retrieve(gceId="111")
test_instance = test_instances[0]
await test_instance.destroy()
# テスト用のInstanceの削除を確認する
test_instances = await atom.VmInstance.retrieve(gceId="111")
assert test_instances == [], "Test instance exists"
Testcase④
Test Target Type: Scenario
name: DetectGCEFailure
category: SampleDag
target: DetectSystemFailure
illusion: DetectGCEFailure
Preparation:
async def preparation():
# テスト用のVmInstanceクラスのインスタンスを作成する
test_instance = await atom.VmInstance(
instanceName="test-instance",
gceId="111",
machineType="e2-medium"
)
# データベースに保存する
await test_instance.save()
Assert Begin:
async def hoge(*args, **kwargs):
pass
Scenario Input
- method: GET
- path: /sample_dag/instances/111/detect
- headers:
{
"Content-Type": "application/json"
}
Scenario Input(script)
- get instance from SDK: -
- get instance from GCE: -
- detect failure: -
- start dag workflow: -
Scenario Output:
async def assertion():
assert Response.code == 200, "Invalid response code %r" % Response.code
Scenario Output(script)
- get instance from SDK: -
- get instance from GCE: -
- detect failure: -
- start dag workflow: -
Assert End:
async def assertion():
cause = json.loads(Response.body.decode("utf-8"))["detect_result"]["cause"]
dag_instance_id = json.loads(Response.body.decode("utf-8"))["detect_result"]["dag_instance_id"]
# 障害理由と作成されたDAGのinstanceIdを確認
assert cause == "Failure in GCE system", "%r" % Response.body.decode("utf-8")
assert dag_instance_id == "111", "%r" % Response.body.decode("utf-8")
Cleanup:
async def cleanup():
# テスト用のInstanceを削除する
test_instances = await atom.VmInstance.retrieve(gceId="111")
test_instance = test_instances[0]
await test_instance.destroy()
# テスト用のInstanceの削除を確認する
test_instances = await atom.VmInstance.retrieve(gceId="111")
assert test_instances == [], "Test instance exists"
Testcase⑤
Test Target Type: Scenario
name: DetectMachineTypeMismatch
category: SampleDag
target: DetectSystemFailure
illusion: DetectMachineTypeMismatch
Preparation:
async def preparation():
# テスト用のVmInstanceクラスのインスタンスを作成する
test_instance = await atom.VmInstance(
instanceName="test-instance",
gceId="111",
machineType="e2-medium"
)
# データベースに保存する
await test_instance.save()
Assert Begin:
async def hoge(*args, **kwargs):
pass
Scenario Input
- method: GET
- path: /sample_dag/instances/111/detect
- headers:
{
"Content-Type": "application/json"
}
Scenario Input(script)
- get instance from SDK: -
- get instance from GCE: -
- detect failure: -
- start dag workflow: -
Scenario Output:
async def assertion():
assert Response.code == 200, "Invalid response code %r" % Response.code
Scenario Output(script)
- get instance from SDK: -
- get instance from GCE: -
- detect failure: -
- start dag workflow: -
Assert End:
async def assertion():
cause = json.loads(Response.body.decode("utf-8"))["detect_result"]["cause"]
dag_instance_id = json.loads(Response.body.decode("utf-8"))["detect_result"]["dag_instance_id"]
# 障害理由と作成されたDAGのinstanceIdを確認
assert cause == "A mismatch exists between the GCE instance and the SDK instance data. Details:machineType", "%r" % Response.body.decode("utf-8")
assert dag_instance_id == "111", "%r" % Response.body.decode("utf-8")
Cleanup:
async def cleanup():
# テスト用のInstanceを削除する
test_instances = await atom.VmInstance.retrieve(gceId="111")
test_instance = test_instances[0]
await test_instance.destroy()
# テスト用のInstanceの削除を確認する
test_instances = await atom.VmInstance.retrieve(gceId="111")
assert test_instances == [], "Test instance exists"
以上でScenarioに対するTestcaseの実装は完了です。
次にDAGのワークフローに対するTestcaseの実装に入ります。
Scenario同様に[Unit Test as a Service]>[Test Case]>[Create New Testcase]
からTestcaseを作成することができます。
以下の画面でTest Target Type
にDag
を設定してください。
次にTestcaseのname
、category
、target
を入力します。
target
には、今回テストしたいDAGを選択してください。
まずはGCEにVMインスタンスが存在しない場合のTestcaseを作成するので、以下を入力してください。
name: RecoveryInstanceNotFound
caterogory: SampleDag
target: RecoveryWork
次にillusionの設定ですが、scenarioのTestcase同様にTestcase Setting
から以下を設定します。
illusion:RecoveryInstanceNotFound
次にmetadata
の設定です。
Testcase Setting
タブのmetadata
に以下を設定します。
ここで設定したmetadata
はDAGのテストの際に初期のmetadata
としてDAGのインスタンスに受け渡されます。
{
"instance_name": "test-instance",
"cause": "Instance not found",
"user_name": "user",
"operator_name": "operator",
"msg_type": "error",
"resolution": "Create the GCE instance",
"gce_id": "111",
"dag_instance_id": "1"
}
次にFlow Setting
タブを開いてください。
Note
それぞれの項目の説明は以下になります。
-
Preparation(準備)
テスト実行前の初期化や環境設定を行います。たとえば、テスト対象のデータやモックのセットアップを行います。 -
Assertion(テスト結果への検証)
DAGのテスト実行後の応答結果や状態変化を確認・記録します。出力内容が期待通りかをチェックするための処理です。 -
Cleanup(後処理)
テスト実行後に環境を元に戻すための処理です。たとえば、作成したテストデータの削除やリソースの解放などを行います。
また、Preparation
とAssertion
の中間のプラスボタンをクリックするとFlow Setting
の画面は以下のようになります。
新たに出現した設定項目ではDAGのワークフローへの中間処理(Taskの承認
、Assertion
等)を設定することができます。
Note
それぞれの項目の説明は以下になります。
-
対象Task
中間処理を実施するトリガーは対象タスクが特定のStatusに遷移した時のため、本項目ではトリガーとなるTaskを指定できます。 -
TaskのStatus
本項目では中間処理を実行するためのトリガーとなるTaskのStatusを設定できます。 DAGのTaskにはStatusが存在し、ワークフローが実行される中で複数の状態に遷移します(プログラミングガイド参照)。 -
interaction
中間処理を設定できます。 中間処理ではワークフローの途中結果の検証や、承認待ちTaskへの承認などができます。
今回のTestcase
ではFlow Setting
を以下のように設定します。
Preparation:
async def preparation():
gce_id = "111"
instance_name = "test-instance"
machineType = "e2-medium"
# テスト用のVmInstanceクラスのインスタンスを作成する
test_instance = await atom.VmInstance(
instanceName=instance_name,
gceId=gce_id,
machineType=machineType
)
# データベースに保存する
await test_instance.save()
Assertion for Tasks
- target: task03
status: ApprovalPending
interaction:
async def interaction():
await dagInstance.approval(["task_03"], name="operator", withRun=True, _illusion = "RecoveryInstanceNotFound")
- target: task08
status: ApprovalPending
interaction:
async def interaction():
await dagInstance.approval(["task_08"], name="operator", withRun=True, _illusion = "RecoveryInstanceNotFound")
Note
task03
、task08
はoperatorによる承認待ちタスクのため、StatusがApprovalPending
(承認待ち)の時、承認を実行する。
引数のwithRun
はTaskの承認と同時にワークフローの再開を行う場合にTrueに設定する。
Assertion:
async def assertion():
# DAGインスタンスのワークフローが実装されたか確認
assert dagInstance.status == "Done", "%r" % dagInstance.status
# ATOMインスタンスのgce_idが更新されているか確認
target_instances = await atom.VmInstance.retrieve(gceId="222")
target_instance = target_instances[0]
assert target_instance, "%r" % target_instance
# Ticketインスタンスのstatusが"resolve"になっているか確認
ticket_instances = await atom.TicketInstance.retrieve(ticketId=dagInstance.metadata["ticket_id"])
ticket_instance = ticket_instances[0]
assert ticket_instance.status == "Resolve", "%r" % ticket_instance
Cleanup:
async def cleanup():
# テスト用のInstanceを削除する
test_instances = await atom.VmInstance.retrieve(gceId="222")
test_instance = test_instances[0]
await test_instance.destroy()
# テスト用のInstanceの削除を確認する
test_instances = await atom.VmInstance.retrieve(gceId="222")
assert test_instances == [], "Test instance exists"
# テスト用のTicketInstanceを削除する
ticket_instances = await atom.TicketInstance.retrieve(ticketId=dagInstance.metadata["ticket_id"])
ticket_instance = ticket_instances[0]
await ticket_instance.destroy()
# テスト用のTicketInstanceの削除を確認する
ticket_instances = await atom.TicketInstance.retrieve(ticketId=dagInstance.metadata["ticket_id"])
assert ticket_instances == [], "Ticket instance exists"
# テスト用のDAGInstanceを削除する
await dagInstance.destroy()
dag_instance = await DAGInstance.load(dagInstance.instance, noException=True)
assert dag_instance is None, "Dag instance exists"
残り2つのTestcaseも以下の設定を参考に作成してください。
Testcase②
name: RecoveryGCEFailure
caterogory:SampleDag
target: RecoveryWork
illusion:RecoveryGCEFailure
metadata:
{
"instance_name": "test-instance",
"cause": "Failure in GCE system",
"user_name": "user",
"operator_name": "operator",
"msg_type": "error",
"resolution": "Change status of VmInstance to GCE SYSTEM ERROR",
"gce_id": "111",
"dag_instance_id": "1"
}
Preparation:
async def preparation():
gce_id = "111"
instance_name = "test-instance"
machineType = "e2-medium"
# テスト用のVmInstanceクラスのインスタンスを作成する
test_instance = await atom.VmInstance(
instanceName=instance_name,
gceId=gce_id,
machineType=machineType
)
# データベースに保存する
await test_instance.save()
Assertion for Tasks
- target: task03
status: ApprovalPending
interaction:
async def interaction():
await dagInstance.approval(["task_03"], name="operator", withRun=True, _illusion = "RecoveryGCEFailure")
- target: task08
status: ApprovalPending
interaction:
async def interaction():
await dagInstance.approval(["task_08"], name="operator", withRun=True, _illusion = "RecoveryGCEFailure")
Assertion:
async def assertion():
# DAGインスタンスのワークフローが実装されたか確認
assert dagInstance.status == "Done", "%r" % dagInstance.status
# ATOMインスタンスのstatusが更新されているか確認
target_instances = await atom.VmInstance.retrieve(gceId="111")
target_instance = target_instances[0]
assert target_instance.status == "GCE SYSTEM ERROR", "%r" % target_instance
# Ticketインスタンスのstatusが"resolve"になっているか確認
ticket_instances = await atom.TicketInstance.retrieve(ticketId=dagInstance.metadata["ticket_id"])
ticket_instance = ticket_instances[0]
assert ticket_instance.status == "Resolve", "%r" % ticket_instance
Cleanup:
async def cleanup():
# テスト用のInstanceを削除する
test_instances = await atom.VmInstance.retrieve(gceId="111")
test_instance = test_instances[0]
await test_instance.destroy()
# テスト用のInstanceの削除を確認する
test_instances = await atom.VmInstance.retrieve(gceId="111")
assert test_instances == [], "Test instance exists"
# テスト用のTicketInstanceを削除する
ticket_instances = await atom.TicketInstance.retrieve(ticketId=dagInstance.metadata["ticket_id"])
ticket_instance = ticket_instances[0]
await ticket_instance.destroy()
# テスト用のTicketInstanceの削除を確認する
ticket_instances = await atom.TicketInstance.retrieve(ticketId=dagInstance.metadata["ticket_id"])
assert ticket_instances == [], "Ticket instance exists"
# テスト用のDAGInstanceを削除する
await dagInstance.destroy()
dag_instance = await DAGInstance.load(dagInstance.instance, noException=True)
assert dag_instance is None, "Dag instance exists"
Testcase③
name: RecoveryMachineTypeMismatch
caterogory:SampleDag
target: RecoveryWork
illusion:RecoveryMachineTypeMismatch
metadata:
{
"instance_name": "test-instance",
"cause": "A mismatch exists between the GCE instance and the SDK instance data. Details:machineType",
"user_name": "user",
"operator_name": "operator",
"msg_type": "error",
"resolution": "Update machineType of GCE instance",
"gce_id": "111",
"dag_instance_id": "1"
}
Preparation:
async def preparation():
gce_id = "111"
instance_name = "test-instance"
machineType = "e2-medium"
# テスト用のVmInstanceクラスのインスタンスを作成する
test_instance = await atom.VmInstance(
instanceName=instance_name,
gceId=gce_id,
machineType=machineType
)
# データベースに保存する
await test_instance.save()
Assertion for Tasks
- target: task03
status: ApprovalPending
interaction:
async def interaction():
await dagInstance.approval(["task_03"], name="operator", withRun=True, _illusion = "RecoveryMachineTypeMismatch")
- target: task08
status: ApprovalPending
interaction:
async def interaction():
await dagInstance.approval(["task_08"], name="operator", withRun=True, _illusion = "RecoveryMachineTypeMismatch")
Assertion:
async def assertion():
# DAGインスタンスのワークフローが実装されたか確認
assert dagInstance.status == "Done", "%r" % dagInstance.status
# Ticketインスタンスのstatusが"resolve"になっているか確認
ticket_instances = await atom.TicketInstance.retrieve(ticketId=dagInstance.metadata["ticket_id"])
ticket_instance = ticket_instances[0]
assert ticket_instance.cause == "A mismatch exists between the GCE instance and the SDK instance data. Details:machineType", "%r" % ticket_instance
assert ticket_instance.status == "Resolve", "%r" % ticket_instance
Cleanup:
async def cleanup():
# テスト用のInstanceを削除する
test_instances = await atom.VmInstance.retrieve(gceId="111")
test_instance = test_instances[0]
await test_instance.destroy()
# テスト用のInstanceの削除を確認する
test_instances = await atom.VmInstance.retrieve(gceId="111")
assert test_instances == [], "Test instance exists"
# テスト用のTicketInstanceを削除する
ticket_instances = await atom.TicketInstance.retrieve(ticketId=dagInstance.metadata["ticket_id"])
ticket_instance = ticket_instances[0]
await ticket_instance.destroy()
# テスト用のTicketInstanceの削除を確認する
ticket_instances = await atom.TicketInstance.retrieve(ticketId=dagInstance.metadata["ticket_id"])
assert ticket_instances == [], "Ticket instance exists"
# テスト用のDAGInstanceを削除する
await dagInstance.destroy()
dag_instance = await DAGInstance.load(dagInstance.instance, noException=True)
assert dag_instance is None, "Dag instance exists"
以上でScenarioとDAGに対するTestcaseの実装が完了です。
UnitTest(単体試験)
最後に作成したテストを実行してみます。
[Unit Test as a Service]>[Unit Test]
を開きます。
UnitTestの詳細な使用方法の説明はAPI開発チュートリアルに記載のため省略します。
今回作成した以下8つのTestcaseを実行し、Test進捗が100%になることを確認してください。
Test Case | 対象Service | 想定するケース |
---|---|---|
GetInstanceSuccess | Scenario | GCEへのインスタンス情報取得APIが成功するケース |
DetectNoSystemFailure | Scenario | 障害検知APIで障害が検知されないケース |
DetectInstanceNotFound | Scenario | 障害検知APIでGCE上にVMインスタンスが確認できない障害を検知するケース |
DetectGCEFailure | Scenario | 障害検知APIでGCEのシステム自体の障害を検知するケース |
DetectMachineTypeMismatch | Scenario | 障害検知APIでGCEとSDKでMachineTypeの情報が一致しない障害を検知するケース |
RecoveryInstanceNotFound | DAG | DAGによるGCEに指定のインスタンスが存在しない障害へ復旧対応する場合 |
RecoveryGCEFilure | DAG | DAGによるGCEのシステム自体の障害へ復旧対応する場合 |
RecoveryMachineTypeMismatch | DAG | DAGによるGCEとSDKでMachineTypeの情報が一致しない障害へ復旧対応する場合 |
以上で単体試験が終了になります。
結合試験が実施できない方は以上でDAGのチュートリアルは修了になりますが、結合試験ではDAGのワークフローやTaskの状態遷移の様子を理解することができるため、目を通すことを推奨しています。
結合試験
結合試験では実際に障害を発生させ、Scenarioによる障害検知からDAGによる障害復旧までの一連の処理が正常に動作するかを確認します。
再掲になりますが、結合試験ではFakerやIllusionを使用しないため、以下ができることが前提条件となります。
- Google CloudでComputeEngineの権限を持つサービスアカウントを作成している。
- Google CloudのCompute Engine APIを用いてCompute EngineインスタンスをCRUDすることができる。
- SlackのAppを作成し、WorkSpaceにインストールできる。
GCEにVMインスタンスがない場合
まずはSDK内のATOMインスタンスにはVMインスタンス情報があるが、GCE上には存在しない場合の結合試験を行います。
試験準備
まず実際に障害を発生させるための準備を行います。
準備としてはSDK内にのみVMインスタンス情報を作成します。
[Workflow Scenario as a Service]>[Scenario]
を開いてください。
Interactive Shellから以下のコードを入力してください。
# GCEのidを111とするATOMインスタンスを作成(instanceNameはまだGCEに作成されていない名前であればOK)
>>> test_instance = await atom.VmInstance(instanceName="test-instance",gceId="111")↵
... ↵
↵
# 作成したインスタンス情報を確認
>>> print(test_instance)↵
... ↵
↵
VmInstance(instance='Vm1JbnN0YW5jZTo3ZGYxMjM3YTczNjUxMWYwOGFmNzM2ZTgwZTVmMjdlYg==', xid=None, xname=None, instanceId=293, instanceName='test-instance', projectId='************', zone='************', machineType='e2-medium', sourceImage=None, network=None, status=None, gceId='111')
# インスタンスを保存
>>> await test_instance.save()↵
>>> ↵
↵
# GCEのidをkeyにインスタンスを検索
>>> test_instances = await atom.VmInstance.retrieve(gceId="111")↵
... ↵
↵
>>> print(test_instances)↵
... ↵
↵
# 取得できたインスタンスの個数が1つであればOK
[VmInstance(instance='Vm1JbnN0YW5jZTo3ZGYxMjM3YTczNjUxMWYwOGFmNzM2ZTgwZTVmMjdlYg==', xid=None, xname=None, instanceId=293, instanceName='test-instance', projectId='************', zone='************', machineType='e2-medium', sourceImage=None, network=None, status=None, gceId='111')]
# instanceNameをkeyにインスタンス検索
>>> test_instances = await atom.VmInstance.retrieve(instanceName="test-instance")↵
... ↵
↵
# 取得できたインスタンスの個数がgceId='111'の1つのみであればOK
[VmInstance(instance='Vm1JbnN0YW5jZTo4MTY1ODc2ZTljZGExMWYwYTliZTI2NDY3MTA0YWNiNg==', xid=None, xname=None, instanceId=293, instanceName='test-instance', projectId='************', zone='************', machineType='e2-medium', sourceImage=None, network=None, status=None, gceId='111')]
対象のインスタンスが複数存在する場合には、プログラミングガイド>Scenario>ATOMを参考にインスタンスを削除してください。
以上により、SDK内にはインスタンス情報が存在するが、GCE上にはVMが存在していないというシチュエーションを再現できました。
結合試験
それでは障害検知のScenarioをTry API Call
から呼び出しましょう。
url resource
にはSDK内に作成したインスタンスのgce_id
である111
を設定してください。
APIを叩くと、障害を検知したScenarioがDAGのワークフローを実行します。
ワークフローではまずoperatorとuserへ通知を行うので、Slack通知を受信したか確認しましょう。
User向けのチャンネルでは以下のようなメッセージが送信されます。
Operator向けのチャンネルでは以下のようなメッセージが送信されます。
現在DAGのワークフローは以下のようになっています。
Operatorによる承認待ちをしている状態なので、通知を受け取ったOperatorはDAGのインスタンスを確認します。
通知に記載のDAGInstanceId
をコピーし、[Workflow Scenario as a Service]>[DAG]
を開きます。(インスタンス情報を確認する場合はControl Panel
のDAGになります。)
左上の検索窓にコピーしたDAGInstanceId
を入力し検索すると以下の画像のようになります。
Note
画面の説明をします。
-
赤枠:DAGインスタンスの各Taskのstatusやワークフローの進行状況をグラフィカルに確認することができます。
-
緑枠:各Taskの状態遷移を時系列で確認できます。
-
青枠:Taskの状態遷移と色の関係を確認できます。
DAGのワークフローの3つの並列TaskはDone
になっていることが確認できます。
また、現在はOperatorの承認待ちTaskがApprovalPending
になっていることを確認できます。
もし、Taskが失敗した場合、以下のようになります。
次にSlack通知の他にシステム障害のTicketが作成されているはずなので確認してみましょう。
画面右上の再生マークをクリックしてください。
すると、以下のように現在のmetadata
が確認できます。(一度画面を更新する必要があるかもしれません)
metadata
の項目にticket_id
が記載されているため、コピーしてください。
次にInteractive Shellにて以下のコードを入力してください。
# ticket_idでTicket検索
>>> tickets = await atom.TicketInstance.retrieve(ticketId="175")↵
... ↵
>>> print(tickets)↵
... ↵
↵
# 今回作成されたTicketはstatusがNewでcause, resolutionが今回の障害に対応指定ればOK
[TicketInstance(instance='VGlja2V0SW5zdGFuY2U6MzdhMGU4ZjQ3MzY3MTFmMDhhZjczNmU4MGU1ZjI3ZWI=', xid=None, xname=None, ticketId=175, status='New', instanceName='test-instance', cause='Instance not found', resolution='Create the GCE instance')]
それではワークフローに戻ります。
現在Operatorによる承認待ちなので、承認をします。
以下のように承認待ちTaskをクリックし、Approval
ボタンをクリックします。
以下のような画面になり、承認時に渡せるmetadata
を設定できます。
承認待ちTaskでは、承認者の名前を受け取るため以下のmetadata
を設定し、Approve
ボタンをクリックしましょう。
{
"name":"operator"
}
承認は実施しましたが、ワークフロー自体は再開していません。
なので、右上の再生マークをクリックし、Run
ボタンをクリックしてください。
ワークフローが再開されたことで、復旧作業が開始されます。
今回の復旧作業はGCEにインスタンスを作成するため、以下のワークフロー図のTask04
が動作し、他の復旧作業はSkipされました。
Caution
Taskの中で復旧作業がSkipされただけで、Task自体はSkipされていないことに注意
復旧作業終了後、DAGのワークフローは以下のような状態になります。
Operatorへの復旧確認通知が送信され、Operatorによる復旧作業承認待ちとなっています。
Operator向けのSlackチャンネルには以下のメッセージが送信されているか確認してください。
復旧確認通知ではMsgTypeがrecovery
になっているはずです。
次に復旧作業が正しく行われたか、確認します。
まずは手元のPCからgcloud
コマンドを利用して、GCEからインスタンス情報を取得します。
# 以下のコマンドでVM情報の取得(instance_nameとzoneは自身で記載)
gcloud compute instances describe <instance_name> --zone=<zone>
上記のコマンドでVM情報が出力された場合、GCEにVMを作成することができている。
取得できたVM情報からインスタンスIDをコピーしてください。
(gcloud
が使用できない場合、Google Cloudのコンソール上から確認してください。)
GCE上にインスタンスが作成されたことが確認できた後、SDK内の既存のVMインスタンスのインスタンスID(gceId)
が更新されているかを確認します。
Interactive Shellにて以下のコードを入力してください。
# Google CloudのコンソールからコピーしたインスタンスIDで検索
>>> test_instances = await atom.VmInstance.retrieve(gceId="*****")↵
... ↵
↵
>>> print(test_instances)↵
... ↵
↵
# 元々のATOMインスタンスのinstanceIdと同じであればOK(復旧作業中にgceIdが更新されている)
[VmInstance(instance='Vm1JbnN0YW5jZTo3ZGYxMjM3YTczNjUxMWYwOGFmNzM2ZTgwZTVmMjdlYg==', xid=None, xname=None, instanceId=293, instanceName='test-instance', projectId='*********', zone='*********', machineType='e2-medium', sourceImage=None, network=None, status=None, gceId='*******')]
>>>
# 初期のgceIdは「111」
復旧作業の確認ができたらOperatorによる承認を行い、ワークフローを再開します。(承認時、承認者のname
をmetadata
に設定してください)
再開後、DAGのインスタンスは以下のようになります。
全てのTaskがDone
に遷移したことが確認できます。
また、UserへのSlack通知も確認してください。
最後にTicketが更新されたかを確認します。
metadata
からticket_id
をコピーし、Interacitve Shellから以下のコードを入力します。
# ticketIdから対象のTicketを検索します。
>>> ticket = await atom.TicketInstance.retrieve(ticketId="175")↵
... ↵
↵
>>> print(ticket)↵
... ↵
↵
# statusがResolveになっていればOK
[TicketInstance(instance='VGlja2V0SW5zdGFuY2U6MzdhMGU4ZjQ3MzY3MTFmMDhhZjczNmU4MGU1ZjI3ZWI=', xid=None, xname=None, ticketId=175, status='Resolve', instanceName='test-instance', cause='Instance not found', resolution='Create the GCE instance')]
>>>
以上で本ケースの結合試験が終了です。
GCEのシステム自体に障害が発生する場合
試験準備
本試験では「GCEにVMインスタンスがない場合」の試験で作成したATOMインスタンスを使用するため、削除した場合は再作成してください。
また、インスタンスのgceId
をメモしてください。
本結合試験ではGCEシステム自体の障害を再現しなければなりません。
しかし、再現は難しいので障害検知ScenarioでIllusion
を使用します。
新しく以下のIllusion
を作成してください。
name | category | fakes |
---|---|---|
TestGCEFailure | SampleDag | getInstanceByScenario - GCEFailure |
結合試験
それでは障害検知のScenarioをTry API Call
から呼び出しましょう。
url resource
にはインスタンスのgce_id
を設定してください。
またIllusion
を使用するためにHTTP HederのX-Xaas-Illusion
にTestGCEFailure
を設定してください。
Execute Debug
をクリックし、APIを呼び出します。
障害を検知し、DAGのインスタンスが作成・実行されUserとOperator宛てにSlack通知が飛んでいるはずなので確認してください。
また、通知内容のcause(障害原因)、resolution(対応策)なども確認してください。
Operatorには作成されたDagのInstanceId
が通知されているので、コピーし、Control Panelでインスタンスを確認してください。
DAGのワークフローはOperatorの承認待ちになっていることが確認できます。
「GCEにVMインスタンスがない場合」の結合試験同様に新しくTicketが作成されていることを確認し、確認できたら承認とワークフローの再開を行います。(承認時に承認者のnameをmetadataで設定します)
承認後、復旧作業が開始されます。
本障害では、復旧作業(障害対処)として、GCEのシステムに障害がある旨をSDK内のATOMインスタンスのstatusに設定します。
Interactive Shellで以下のコードを入力し、障害復旧(障害対処)が完了しているか確認します。
# 対象インスタンスのgceIdで検索
>>> test_instances = await atom.VmInstance.retrieve(gceId="*****")↵
... ↵
↵
>>> print(test_instances)↵
... ↵
↵
# statusにGCE SYSTEM ERRORが入力されていればOK
[VmInstance(instance='Vm1JbnN0YW5jZTo3ZGYxMjM3YTczNjUxMWYwOGFmNzM2ZTgwZTVmMjdlYg==', xid=None, xname=None, instanceId=293, instanceName='test-instance', projectId='*******', zone='*********', machineType='e2-medium', sourceImage=None, network=None, status='GCE SYSTEM ERROR', gceId='4841664086913176400')]
>>>
復旧作業が確認できたら、OperatorによるTaskの承認とワークフローの再開を行います。(承認時に承認者のnameをmetadataで設定します)
「GCEにVMインスタンスがない場合」の結合試験同様にUserへの復旧完了通知とTicketの更新が正しく行われたか確認してください。
以上で本ケースの結合試験が終了です。
GCEとSDKでインスタンスのMachineTypeが異なる場合
試験準備
本試験では「GCEにVMインスタンスがない場合」の試験で作成したGCEのインスタンスとATOMインスタンスを使用するため、削除した場合は再作成してください。(GCEのインスタンスIDとATOMインスタンスのgceId
は同じ値である必要があります)
また、インスタンスのgceId
をメモしてください。
本結合試験ではSDK内のMachineType
情報を手動で変更することで、障害を再現します。
InteractiveShellに以下のコードを入力し、MachineType
を変更してください。
# gceIdで対象インスタンスを検索
>>> test_instances = await atom.VmInstance.retrieve(gceId="******")↵
... ↵
↵
>>> test_instance = test_instances[0]↵
... ↵
↵
>>> print(test_instance)↵
... ↵
↵
# machineTypeがe2-mediumであることを確認
VmInstance(instance='Vm1JbnN0YW5jZTo3ZGYxMjM3YTczNjUxMWYwOGFmNzM2ZTgwZTVmMjdlYg==', xid=None, xname=None, instanceId=293, instanceName='test-instance', projectId='******', zone='*********', machineType='e2-medium', sourceImage=None, network=None, status='GCE SYSTEM ERROR', gceId='********')
# machineTypeをe2-smallに更新
>>> test_instance.machineType = "e2-small"↵
... ↵
↵
>>> print(test_instance)↵
... ↵
↵
# machineTypeがe2-smallに更新されていることを確認
VmInstance(instance='Vm1JbnN0YW5jZTo3ZGYxMjM3YTczNjUxMWYwOGFmNzM2ZTgwZTVmMjdlYg==', xid=None, xname=None, instanceId=293, instanceName='test-instance', projectId='******', zone='*********', machineType='e2-small', sourceImage=None, network=None, status='GCE SYSTEM ERROR', gceId='*******')
# インスタンスを保存
>>> await test_instance.save()↵
... ↵
↵
# 以下保存されていることを確認
>>> test_instances = await atom.VmInstance.retrieve(gceId="*******")↵
... ↵
↵
>>> test_instance = test_instances[0]↵
... ↵
↵
>>> print(test_instance)↵
... ↵
↵
# machineTypeがe2-smallであればOK
VmInstance(instance='Vm1JbnN0YW5jZTo3ZGYxMjM3YTczNjUxMWYwOGFmNzM2ZTgwZTVmMjdlYg==', xid=None, xname=None, instanceId=293, instanceName='test-instance', projectId='***********', zone='***********', machineType='e2-small', sourceImage=None, network=None, status='GCE SYSTEM ERROR', gceId='4841664086913176400')
>>>
gcloud
コマンドにより、GCEのVM情報も確認しましょう。(gcloud
が利用できない人はGCEのコンソール画面から確認してください)
gcloud compute instances describe <instance_name> --zone=<zone>
<省略>
machineType: https://www.googleapis.com/compute/v1/projects/<project_id>/zones/<zone>/machineTypes/e2-medium
<省略>
上記の出力結果よりmachineType
がe2-medium
であることがわかり、先ほどのSDK内のATOMインスタンスの変更により、SDKとGCEのmachineTypeが異なることが確認できます。
結合試験
それでは障害検知のScenarioをTry API Call
から呼び出しましょう。
url resource
にはインスタンスのgce_id
を設定してください。
Execute Debug
をクリックし、APIを呼び出します。
Scenario実行後、障害を検知し、DAGのインスタンスが作成・実行されUserとOperator宛てにSlack通知が飛んでいるはずなので確認してください。
また、通知内容のcause(障害原因)、resolution(解決策)なども確認してください。
Operatorには作成されたDAGのInstanceIdが通知されているので、コピーし、Control Panelでインスタンスを確認してください。
DAGのワークフローはOperatorの承認待ちになっていることが確認できます。
「GCEにVMインスタンスがない場合」の結合試験同様に新しくTicketが作成されていることを確認し、確認できたら承認とワークフローの再開を行います。(承認時に承認者のnameをmetadataで設定します)
承認後、復旧作業が開始されます。
本障害では、復旧作業として、SDKのMachineTypeを正しいとして、GCEのMachineTypeを更新します。
復旧作業の確認として、gcloud
コマンドを利用してVM情報を取得します。(コンソール上の確認でも可)
gcloud compute instances describe <instance_name> --zone=<zone>
<省略>
machineType: https://www.googleapis.com/compute/v1/projects/<project_id>/zones/<zone>/machineTypes/e2-small
<省略>
上記の出力結果から、復旧後にmachineTypeがSDK内のインスタンス情報と同じe2-small
に変更されていることが確認できます。
GoogleCloudのコンソールからもMachineTypeがe2-small
に更新されていることを確認できます。
復旧作業が確認できたら、OperatorによるTaskの承認とワークフローの再開を行います。(承認時に承認者のnameをmetadataで設定します)
「GCEにVMインスタンスがない場合」の結合試験同様にUserへの復旧完了通知とTicketの更新が正しく行われたか確認してください。
以上で本ケースの結合試験が終了です。
ーー 以上、DAG開発のチュートリアルになります。