DAG開発チュートリアル

はじめに

このチュートリアルでは、Qmonus SDKのScenarioサービスの機能であるDAGの開発を体験します。
DAGを使用したユースケースの一つであるZTO(Zero Touch Operation)を題材に取り上げ、実際にGoogle CloudのComputeEngineインスタンスに障害が起きた時の障害対応、復旧作業を行うデモシステムを開発します。

DAGとは?

DAG(Directed Acyclic Graph)は、有向非巡回グラフ型のワークフローモデルです。
DAGは、処理タスク間の依存関係を定義したワークフローで以下のようなイメージになります。この例では、DAGフローが開始されるとtask-atask-bが並列実行され、双方が完了するとtask-cが実行されます。task-cが完了するとtask-dtask-eが並列実行され、task-dが完了するとtask-fを実行して終了します。

API開発チュートリアルで実装したScenarioとの違いに関しては以下の2点になります。

  • 有向非巡回グラフ型 であること
    • Scenarioは定義されたToyblockが順番通りに処理されるが、DAGは複数のタスクを並列に処理させることができる
  • APIでは直接呼び出せない
    • ScenarioはAPI pathを定義し、API GatewayにRoutingを設定することで、APIから呼び出せます。
    • DAGはAPIの窓口を持たないため、ScenarioやFunctionの中でSDKの組み込み関数から呼び出します。

ZTOとは?

本チュートリアルの題材となるZTOとはNTTドコモがサービスの保守(故障復旧)業務の完全自動化を目指す仕組みです。
現在、多くのシステムの保守業務は以下のように人手を介した保守業務が中心です。

イメージ図では一連の流れとして表現されていますが、実際の障害対応においては、複数の宛先への報告が必要となるほか、原因調査においても多角的な視点からの切り分けが求められます。
そのため、一つの業務であっても、実際には多くのタスクが人手によって処理されているのが現状です。
この業務を全て自動化するというのがZTOの目標となります。

ZTOの実現に向けて、Qmonus SDKのDAGが以下のように用いられています。

システムの障害点検で使用されているのはQmonus SDKのCLA(Closed Loop Automation)という機能です。
CLAによって検知された障害に対してあらかじめ作成したDAGのワークフローを走らせることで、人手を介さず障害対応を実施できます。
原因調査など複数のタスクの並列処理や、復旧作業後にTicket更新、復旧報告を行うなどタスク間に依存関係がある処理を実行するのにDAGは適しています。

チュートリアルを進める上での準備(前提)

本チュートリアルではAPI開発チュートリアルで開発したSample CRUD APIを一部使用しているため、事前にAPI開発チュートリアルを実施してください。

本チュートリアルでは、Google Cloud の Compute Engine インスタンスに対する障害対応を題材としています。 そのため、以下の操作が可能であれば、より実践的な内容として取り組むことができます(※ただし、これらができなくてもチュートリアルの単体試験までは進行可能です)。

  • Google CloudでComputeEngineの権限を持つサービスアカウントを作成している。
  • Google CloudのCompute Engine APIを用いてCompute EngineインスタンスをCRUDすることができる。
  • SlackのAppを作成し、WorkSpaceにインストールできる。

チュートリアルのシチュエーション

本チュートリアルでは、ZTOの考え方を取り入れた、障害対応自動化システムの開発を題材としたシナリオに基づき、デモシステムを構築します。

本シナリオには、以下の登場人物が存在します。

  • ユーザー
    • GCE上でVMインスタンスを利用しています。障害が発生するとシステム停止などの影響を受ける可能性があります。
  • オペレーター
    • ユーザーが使用している GCE の VM インスタンスを保守しています。障害発生時には、ユーザーへの報告や原因の切り分けなど、複数の手作業を伴う業務を行います。
  • 読者(あなた)
    • ZTOを実現するため、オペレーターの手作業を削減・自動化する保守支援システムの開発に取り組みます。

デモシステムの実装

ここから障害復旧システムの実装に進みます。

実装内容確認

本チュートリアルで実装する障害復旧システムのアーキテクチャは以下になります。

  • ATOM
    SDK内でのVMインスタンスの管理にはAPI開発チュートリアルで作成したVmInstanceを使用します。
    本チュートリアルでは、障害対応時のチケット管理を模擬するため、Ticket情報を管理する ATOM を追加で作成します。

  • Config
    SDK内部からCompute EngineのAPIを実行するにはサービスアカウントキー情報をConfigに保管する必要があります。
    こちらはAPI開発チュートリアルのものを使用するため実装は省略します。

  • API Gateway
    API GWにRoutingを設定します。
    Routing設定により、各ScenarioやDAGで実行されたAPIを、API GWがGoogle Cooudに対して適切にRoutingします。

  • Scenario
    本チュートリアルでは以下2つのScenarioを実装します。

    • GCE(Google Compute Engine)のVMインスタンス情報を取得するScenario
    • GCEのVMインスタンスの障害を検知し、DAGを呼び出して、障害対応を行うScenario
  • DAG
    本チュートリアルでは以下のDAGを実装します

    • Scenarioからの呼び出しで、ユーザーや オペレーターへの対応及び、検知された障害への対応を行うDAG

障害検知から障害対応までのデモシステムの動作イメージ

障害検知

本チュートリアルで実装するデモシステムの障害検知から障害対応における動作イメージを説明します。
前提としてオペレータはGoogle Cloud上にVMインスタンスを作成していて、SDK内のATOMにも対応するVMインスタンス情報が管理されています。

まずオペレーターはGCEのインスタンスに障害が発生していないか確認するため、検知Scenarioを実行します。(イメージ図の簡略化のためAPI GWは省略しています。)

検知ScenarioはGCEからVMインスタンス情報を取得するScenarioを利用してVM情報を取得します。 また、ATOMが保持している対象のVM情報を取得します。 ここで、GCEからVM情報の取得ができない場合や、GCEとSDK内のVM情報に誤差があった場合、障害として検知します。

本チュートリアルで扱う障害は以下3点になります。

  • SDK内ではVM情報が存在するが、Compute Engine上には存在しない場合

  • Compute Engineのシステム自体に障害が発生し、インスタンス情報が取得できない
    GCE自体に障害がある場合、インスタンス情報を取得するScenarioはAPIのレスポンスを確認できず、検知Scenarioにタイムアウト処理によるレスポンスを返却する。

  • GCEとSDK内のインスタンス情報(MachineType)が異なる

障害対応

障害を検知したScenarioは障害対応を行うためDAGのインスタンスを作成し、復旧に向けてDAGのワークフローを実行します。
ワークフローではオペレーターやユーザへの通知、Ticket作成、障害復旧などを行います。(詳細はDAGの実装に記載)
各障害への復旧(対応)作業は以下になります。

  • SDK内ではVM情報が存在するが、Compute Engine上には存在しない場合
    SDK内のインスタンス情報を参考に、CREATE API(API開発チュートリアルで作成済み)でGCE上にインスタンス作成

  • Compute Engineのシステム自体に障害が発生し、インスタンス情報を取得できない
    ATOMのインスタンス情報にGCEシステムの障害の旨を記録することで対応

  • GCEとSDK内のインスタンス情報(MachineType)が異なる
    SDK内のインスタンス情報を参考に、UPDATE API(API開発チュートリアルで作成済)でGCEのインスタンス情報を更新

上記が本チュートリアルで実装するデモシステムの動作イメージになります。

ATOMの実装

実装内容の確認

本チュートリアルでは以下の2つのATOMを実装します。

  • VmInstance(API開発チュートリアルで実装済)
    • GCEのインスタンス情報を所持している。
  • TicketInstance
    • 本チュートリアルで実装。
    • インスタンス障害検知時に、障害に関する情報を記録し管理するためのチケットを模擬したもの。

VmInstanceはすでにAPI開発チュートリアルで実装されているため、本チュートリアルではTicketInstanceを実装します。

ATOMの定義

クラスの作成 [Workflow Scenario as a Service]>[Class]>[Create New Class]からクラスを作成します。

入力は以下を参考にしてください。

項番 項目 入力値 説明
1 category SampleDag ATOMが属する分類を指定してください。この情報はクラスエディタでの階層表示のためのタグとしてのみ使用されます。
2 workspace 未指定 ワークスペースを設定します。
3 name TicketInstance ATOMの名前を指定します(クラス名と同義)。ユニークである必要があります。

メタ情報の定義

persistenceabstractapi_generationなどのその他メタ情報にはdefault値が設定されています。今回はdefaultの設定とします。

項番 項目 入力値 説明
1 persistence 選択(default) 永続化モードを指定します。選択した場合、ATOM定義を保存するとクラス構造に対応するデータベーステーブルが自動的に生成され、データベースに対するCRUDの組込みメソッドが実装されます。
2 abstract 未選択(default) 抽象クラスモードを指定します。
3 api_generation 選択(default) API自動生成モードを指定します。API自動生成モードでは、ATOMをCRUDするためのRestful APIが自動的に生成されます。

identifier fieldの定義

[attributes]>[identifier]に以下を入力します。

項番 項目 入力値 説明
1 title ticketId フィールド名を指定します
2 field_type integer フィールドの型を指定します。
3 field_persistence 選択(default) フィールドの永続化モードを指定します。選択しない場合は本フィールドはデータベースカラムとして生成されません。
4 field_immutable 選択(default) フィールドの値が不変かを指定します。選択した場合、本フィールドの値変更は許可されません。
5 field_metadata* {"POST": false} フィールドに任意のメタ情報を定義します。メタ情報はdict型のデータを設定してください。

local fieldsの定義

以下を入力します。

項番 項目 入力値 説明
1 field_name status identifier field参照
2 field_type [String]を選択(default) identifier field参照
3 field_persistence 選択(default) identifier field参照
4 field_nullable 選択(default) フィールドの値にNullを許容するかを指定します。
5 field_immutable 未選択(default) identifier field参照
6 field_unique 未選択(default) フィールドが複合ユニークインデックスの対象かを指定します。
7 field_format* '[a-zA-Z0-9]' フィールドのフォーマットをjsonschemaもしくは正規表現で指定します。
8 field_metadata* {"POST": true, "PUT": true} identifier field参照

同様に以下のlocal fieldsも定義します。

  • instanceName
  • cause
  • resolution

カウンタの作成

[Transaction as a Service]>[Transaction Settings]の画面右上にある[+]ボタンを押下すると、カウンタ作成画面が開きます。

以下の項目を入力してください。

項番 項目 入力値 説明
1 counter_type Number カウンタの種類を指定します。 Number、UUID、Inventoryの3つのタイプがあります。
2 workspace 未入力 ワークスペースを設定します。
3 counter_name ticketId カウンタを一意に識別する名前を指定します。
4 counter_format $ カウンタから返される値の形式を指定します。値は$で表されます。
5 min_num 1 カウンタの最小値を指定します。
6 max_num 9999 カウンタの最大値を指定します。
7 padding 選択しない ゼロパディングモードを指定します。

メソッド(instance_methods)の定義

以下を上記で定義したVmInstanceのinstance_methodsmethod_bodyに入力する。

async def initialize(self, *args, **kwargs):
    if not self.ticketId:
        self.ticketId = int(await Counter.allocate("ticketId"))

ATOMの確認

上記でATOMの用意ができました。
Interactive shellから、以下の入力をしてみて動作を確認してみましょう。

# ticketInstanceの作成
>>> ticket_instance = await atom.TicketInstance()↵
... ↵
↵
>>> print(ticket_instance)↵
... ↵
↵
# ticketIdが割り当てられ、インスタンスが作成されていることを確認
TicketInstance(instance='VGlja2V0SW5zdGFuY2U6ZmY0NDMzMzY2M2FjMTFmMDkyYjhhZTRlZDk2Njg1ZTQ=', xid=None, xname=None, ticketId=37, status=None, instanceName=None, cause=None, resolution=None)

Routingの実装

実装内容の確認

本チュートリアルで実装するRoutingは以下です。
/SampleDag/{projectId}/{zone}/{instanceName}https://compute.googleapis.com/compute/v1/projects/{projectId}/zones/{zone}/instances/{instanceName}

このRoutingはGCEのインスタンス情報をGETする際に使用します。

Routingの設定

API開発チュートリアルのRouting設定を参考に以下のパラメータを設定する。

項番 項目 入力値
1 workspace 未設定
2 domain default(デフォルト)
3 scheme(proxy) http(デフォルト)
4 path(proxy) /SampleDag/{projectId}/{zone}/{instanceName}
5 authority compute.googleapis.com
6 scheme(target) https
7 path(target) /compute/v1/projects/{projectId}/zones/{zone}/instances/{instanceName}

以上でRoutingの設定は終了です。

Scenarioの実装

実装内容確認

障害対応システムのScenario実装に進む前にSample CRUD APIについて確認し、再利用するものと新しく開発するものを確認をします。
Sample CRUD APIでは以下5つのScenarioを実装しました。

アクセストークン取得 : アクセストークンを取得する(本チュートリアルでも再利用)
CREATE : ComputeEngineのインスタンスを作成する(本チュートリアルでも再利用)
READ : SDKのATOMからComputeEngineインスタンスの情報を取得する
UPDATE : ComputeEngineのインスタンスのMachineTypeを更新する(本チュートリアルでも再利用)
DELETE : ComputeEngineのインスタンスを削除する

本チュートリアルでは以下の2つのScenarioを新たに実装します。

  • Comupute EngineのVMインスタンス情報を取得するScenario
  • ComputeEngineの障害を検知し、復旧のためのDAGを呼び出すScenario
    • Compute Engineのインスタンス情報を取得
    • 障害検知
    • 障害対応するためのDAGを呼び出し、インスタンスを作成、ワークフローの実行

Scenarioの作成

Comupute EngineのVMインスタンス情報を取得するScenarioの作成

以下のように[Workflow Scenario as a Service]>[Scenario]>[Create New Scenario]から新たなScenarioを作成します。

作成画面を開いた後、以下のScenario定義を設定し、Scenarioを作成します。

項番 項目 入力値
1 category SampleDag
2 workspace 未入力
3 name GetInstance
4 method GET
5 uri /sample_dag/instances/{gce_id}
6 transaction 未選択
7 routing_auto_generation_mode 選択

次にscriptブロックの追加を行う。

  • アクセストークンの取得

Display Label: get access token
Python code:

r = await callout(
    # API開発チュートリアルで作成したアクセストークンの取得APIを使用
    path="/sample_crud_api/get_access_token",
    method="GET"
    )

if r.error:
    raise Error(r.code, reason="GET failed")

access_token = json.loads(r.body)["access_token"]
  • SDK内のATOMインスタンスから対象のインスタンス情報を取得

Display Label: get instance from SDK
Python code:

gce_id = context.request.resources.gce_id

# 検知対象のインスタンス情報を取得する
get_instances = await atom.VmInstance.retrieve(gceId=gce_id)
get_instance = get_instances[0]
  • Compute Engineからインスタンス情報を取得

Display Label: get instance from GCE
Python code:

r = await callout(
    path=f"/SampleDag/{get_instance.projectId}/{get_instance.zone}/{get_instance.instanceName}",
    method="GET",
    headers={
        "Authorization": f"Bearer {access_token}"
    }
    )

instance_info = json.loads(r.body.decode("utf-8"))
context.session.finish({"instance_info": instance_info})

次にVariablesに以下の変数を設定する

  • get_instance
  • access_token

動作確認

CREATE APIを利用してGCE上にインスタンスを作成してください。 その後、Interactive shellで以下のようにインスタンス情報を確認します。

# VmInstance情報を全て取得
>>> instance = await atom.VmInstance.retrieve()↵
... ↵
↵
>>> print(instance)↵
... ↵
↵
[VmInstance(instance='Vm1JbnN0YW5jZTozNGI2ZDVjNDg5NGExMWYwOWMyM2Q2MzdmZDljNjEzYw==', xid=None, xname=None, instanceId=19, instanceName='test-instance', projectId='************', zone='************', machineType='e2-medium', sourceImage=None, network=None, status='RUNNING', gceId='********')]
>>>

GCE上のインスタンスID(gceId)が確認できたら、コピーする。
次にGetInstanceの「Try API Call」からurl resourcesにコピーしたgceIdを入力して、APIを呼び出す。 レスポンスでインスタンス情報が取得できれば正常に動作しています

ComputeEngineの障害を検知し、障害対応のためのDAGを呼び出すScenarioの作成

作成画面を開いたのち、以下のScenario定義を設定し、Scenarioを作成します。

項番 項目 入力値
1 category SampleDag
2 workspace 未入力
3 name DetectSystemFailure
4 method GET
5 uri /sample_dag/instances/{gce_id}/detect
6 transaction 未選択
7 routing_auto_generation_mode 選択

scriptブロックの追加

  • SDK内のATOMインスタンスから対象のインスタンス情報を取得

Display Label: get instance from SDK
Python code:

gce_id = context.request.resources.gce_id

# 検知対象のインスタンス情報を取得する
get_instances = await atom.VmInstance.retrieve(gceId=gce_id)
get_instance = get_instances[0]
  • Compute Engineからインスタンス情報を取得

Display Label: get instance from GCE
Python code:

# GCEのインスタンス情報を取得
r = await callout(
    path=f"/sample_dag/instances/{get_instance.gceId}",
    method="GET"
    )
response = r
  • 取得できたインスタンス情報から障害検知

Display Label: detect instance failure
Python code:

import os

# インスタンス情報取得時にerrorがあった場合は障害復旧
if response.code == 599:
    cause = "Failure in GCE system"
    resolution = "Change status of VmInstance"
else:
    instance_info = json.loads(response.body.decode("utf-8"))["instance_info"]
    if "error" in instance_info:
        response_code = instance_info["error"]["code"]
        # エラーコードレスポンスにより異常を検知し、workflowを実施
        if response_code == 404:
            cause = "Instance not found"
            resolution = "Create the GCE instance"
    else:
         # インスタンスのマシンタイプを取得する
        instance_machine_type = os.path.basename(instance_info["machineType"])

        # ATOMとGCEのmachineType情報を比較
        if get_instance.machineType != instance_machine_type:
            cause = "A mismatch exists between the GCE instance and the SDK instance data. Details:machineType"
            resolution = "Update machineType of GCE instance"
        else:
            cause = "null"
            resolution = "null"
  • 障害復旧のためのDAGのインスタンス作成、ワークフロー実施

Display Label: start dag workflow
Python code:

if cause != "null":

    # DAGインスタンスの作成・ワークフロー開始
    dag_Instance_Id = await context.createDagInstance(dagName="RecoveryWork", gce_id=gce_id, cause=cause, user_name="user", operator_name="operator", msg_type="error", resolution=resolution)

    result = {
        "cause": cause,
        "dag_instance_id": dag_Instance_Id
    }

    context.session.finish({"detect_result": result})
else:
    result = {
        "cause": "null"
        }
    context.session.finish({"detect_result": result})

Caution

DAGインスタンスの作成・ワークフローの実行を行っている「createDagInstance」は Functionプラグインにて実装

Variablesの設定 以下の変数をVariablesに設定する。

  • get_instance
  • access_token
  • gce_id
  • resolution
  • cause
  • response

最後に本Scenarioのstart dag workflowで使用されている Functionを実装します。 Function[Workflow Scenario as a Service]>[Bultin]>[Create New Function]から新規作成する。 作成画面を開いたのち、以下の定義を設定し、新たなFunctionを作成します。

項目 入力値
category SampleDag
workspace 未入力
name createDagInstance

次にFuctionのスクリプトに以下を設定します。

async def createDagInstance(
    dagName: str,
    gce_id: str,
    cause: str,
    user_name: str,
    operator_name: str,
    msg_type: str,
    resolution: str
):
    # 対象のインスタンスを抽出
    get_instances = await atom.VmInstance.retrieve(gceId=gce_id)
    get_instance = get_instances[0]

    # DAGインスタンスの作成・ワークフロー開始
    dag = await DAG.load(dagName)
    dag_instance = dag.createInstance()
    dag_instance.run(
        gce_id=gce_id,
        instance_name=get_instance.instanceName,
        cause=cause,
        user_name=user_name,
        operator_name=operator_name,
        dag_instance_id=dag_instance.instance,
        msg_type=msg_type,
        resolution=resolution
        )

    return dag_instance.instance

Note

dag_instance.run()の引数について:
dag_instance.run()は生成されたDAGインスタンスのワークフローをスタートさせるためのメソッドです。
引数にはmetadata(詳細はDAGの実装に記載)を設定できます。

障害検知・対応のScenarioの動作確認はfakerや障害の模擬が必要になるため、UnitTestや結合試験の章で行います。

以上でScenarioの実装は終了です。

DAGの実装

実装内容確認

本チュートリアルで実装するDAGのワークフローは以下になります。

Scenarioによって障害が検知され、その際の障害の基本情報をmetadataとして受け取り、上記のワークフローが実行されます。

Note

metadataとは?:
DAGワークフローの全てのタスクが共用できるグローバル名前空間。
各Taskの実行時にmetadataに存在する変数が引数として自動的にバインドされます。
metadataは、DAG定義時、DAGInstance生成時、DAGInstance走行開始時に指定することが可能です。
各Taskでmetadataを辞書オブジェクトして返却すると、metadataにマージされ、後続のTaskでも利用が可能になります。

参考:プログラミングガイド>DAG>DAGのメタデータ

本チュートリアルでのDAGの役割としては以下の3つになります。

  • User対応
    • 障害・復旧通知
  • Operator対応
    • 障害・復旧通知、復旧作業確認、Ticket対応
  • 障害対応
    • 障害復旧

流れとしてはまず障害を検知した時点で、以下3つの処理を行います。

  • Userへの障害通知
  • Operatorへの障害および復旧作業確認通知
  • 障害を管理するためのTicket作成

次にDAGの承認待ち機能を活用して、復旧作業前にOperatorによる承認待ちを行います。
承認後、検知された障害に対応した復旧作業を実施し、復旧後にOperatorへ復旧内容と復旧確認のための通知をします。
通知を受け取ったOperatorは復旧が完了したかどうかを確認して、承認を行います。
承認後、Userへの復旧通知とTicketの更新が行われるという流れになります。

Note

承認待ち機能とは?:
DAGのTaskには承認待ち機能を持たせることができます。
承認待ち機能をもったTaskは利用者による承認行為がなされるまで、Pendingされます。

この承認待ち機能はZTOのユースケースでも利用されています。
最終的にはZTOによって全ての作業を自動化することが目標ですが、全てを自動化するにはステップが必要です。
例えば、大量のユーザを抱えたシステムにいきなりZTOシステムを導入した場合、障害復旧時に自動的に選択された復旧作業によってさらなる障害を招く可能性があります。

それらを防ぐため、この承認待ち機能が利用されます。
承認待ち機能によって、自動的に調査・選択された障害原因・対応が正常かどうか人の手で確認できます。
完全なZTOの実現の前段階として、承認待ち機能は有効です

本チュートリアルで対応する障害を改めて確認すると以下の3つになります。

障害内容 障害対応内容
SDK内ではVM情報が存在するが、Compute Engine上には存在しない場合 SDK内のインスタンス情報を参考に、CREATE API(API開発チュートリアルで作成済み)でGCE上にインスタンス作成
Compute Engineのシステム自体に障害が発生し、インスタンス情報が取得できない ATOMのインスタンス情報にGCEに発生したシステム障害の旨を記録することで対応
GCEとSDK内のインスタンス情報(MachineType)が異なる SDK内のインスタンス情報を参考に、UPDATE API(API開発チュートリアルで作成済)でGCEのインスタンス情報を更新

それではDAGの実装に進みます。

Slack Appの作成とWorkspaceへのインストール、チャンネルの作成

デモシステムに必要なSlack Appの作成とWorkspaceへのインストールを行います。
詳細な方法は各自で調査してください。

まずは障害や復旧の通知を行うためのSlack Appを作成しWorkspaceにインストールします。
権限としてはScopes>Bot Token Scopeschat:writeを設定します。
Workspaceにインストールできたらチャンネルの作成を行います。
今回使用するチャンネルはUser用とOperator用の2つのチャンネルを作成します。
チャンネルの作成後、チャンネルへAppを追加し、インストールしたAppをチャンネルに追加します。
本チュートリアルでは「dag_user」「dag_operator」というチャンネルを使用します。 上記の名前以外のチャンネルを使用する際には、Functionに自身が作成したチャンネル名を記載する必要があります。 最後にSDK内部からSlackのAppを使用するために必要なOAuth Tokens(xoxbから始まるもの)をメモします。

Functionプラグインの作成

DAGのワークフローを実装する前に、ワークフローのタスクに設定するFunctionプラグインを作成します。

Userへのslack通知用のFunctionの作成

まず[Workflow Scenario as a Service]>[Dag(Resources)]に移動します。
次に以下の画像のようにPluginCreate New Functionから新規Functionを作成します。

作成画面を開いたのち、以下の定義を設定し、新たなFunctionを作成します。

項目 入力値
category SampleDag
workspace 未入力
name notificationToUser

次にFuctionのスクリプトに以下を設定します。(必要に応じてOAuth tokenを書き換えてください)

import os

async def notificationToUser(
      __instance__: str,
      __identifier__: str,
      __reverse__: bool,
      cause: str,
      user_name: str,
      instance_name: str,
      msg_type: str
      ) -> dict:
          # slack通知に必要なパラメータの設定
          username=f"Qmonus {options.version} {os.uname()[1]}"
          send_chan = "#dag_user" # チャンネル名
          emoji=options.slack_notify_emoji
          color="#7CD197"
          token="xoxb-*****" # OAuth Token(自身が用意したTokenに書き換えてください)
          pretext="SampleDag"
          msg=f'To: {user_name}\n' + f'MsgType: {msg_type}\n' + f'InstanceName: {instance_name}\n' + f'cause: {cause}\n'
          params=dict(
            channel=send_chan,
            username=username.encode("utf-8"),
            icon_emoji=emoji,
            attachments=[
                dict(
                    fallback=username,
                    color=color,
                    pretext=pretext,
                    text=msg.encode("utf-8")
                )
            ]
          )

          # slack通知
          r=await callout(
              url="https://slack.com/api/chat.postMessage",
              headers={"Authorization": f"Bearer {token}", "Content-Type": "application/json"},
              method=POST,
              body=params
              )

          return {
               __identifier__: {
                "send_chan": send_chan,
                "user_name": user_name,
                "msg": msg
              }
          }

その他のFunctionの作成

同様に本チュートリアルで使用するFunctionを作成します。
以下の表を参考に7個のFunctionを作成してください。
categoryworkspacenotificationToUserと同様のものを設定してください。

name: notificationToOperator
code:

import os
async def notificationToOperator(
      __instance__: str,
      __identifier__: str,
      __reverse__: bool,
      cause: str,
      operator_name: str,
      instance_name: str,
      msg_type: str,
      resolution: str
      ) -> dict:
          # slack通知に必要なパラメータの設定
          username=f"Qmonus {options.version} {os.uname()[1]}"
          send_chan = "#dag_operator"  # チャンネル名
          emoji=options.slack_notify_emoji
          color="#7CD197"
          token="xoxb-*****"   # OAuth Token(自身が用意したTokenに書き換えてください)
          pretext="SampleDag"
          msg = f'To: {operator_name}\n' + f'MsgType: {msg_type}\n' + f'InstanceName: {instance_name}\n' + f'cause: {cause}\n' + f'DagInstanceId: {__instance__}\n' + f'Resolution: {resolution}\n'
          params=dict(
            channel=send_chan,
            username=username.encode("utf-8"),
            icon_emoji=emoji,
            attachments=[
                dict(
                    fallback=username,
                    color=color,
                    pretext=pretext,
                    text=msg.encode("utf-8")
                )
            ]
          )

          # slack通知(tokenには自身が作成したOAuth Tokensを設定)
          r = await callout(
              url="https://slack.com/api/chat.postMessage",
              headers={"Authorization": f"Bearer {token}", "Content-Type": "application/json"},
              method=POST,
              body=params
              )

          return {
               __identifier__: {
                "send_chan": send_chan,
                "operator_name": operator_name,
                "msg": msg
              }
          }

name: createTicketInstance
code:

async def createTicketInstance( 
      __instance__: str,
      __identifier__: str,
      __reverse__: bool,
      cause: str,
      instance_name: str,
      resolution: str
      ) -> dict:
          # TicketInstanceの作成
          ticket_instance = await atom.TicketInstance(status = "New", cause = cause, instanceName = instance_name, resolution = resolution)
          await ticket_instance.save()

          return {
              __identifier__: {
                "ticket": ticket_instance
              },
              "ticket_id": ticket_instance.ticketId

          }

name: awaitApproval
code:

def awaitApproval(
      __instance__: str,
      __identifier__: str,
      __reverse__: bool,
      name: str
    ) -> dict:
        # 承認者の名前をmetadataに追加
        return {
             __identifier__: {
                "Approved person's name": name

             }

        }

name: createInstance
code:

async def createInstance(
      __instance__: str,
      __identifier__: str,
      __reverse__: bool,
      cause: str,
      instance_name: str,
      gce_id: str
      ) -> dict:

          # SDK内のインスタンスを取得
          target_instances = await atom.VmInstance.retrieve(gceId=gce_id)
          target_instance = target_instances[0]

          # GCEにインスタンスが存在しない障害か確認
          if cause == "Instance not found":
              # 削除されている場合、対象のインスタンスを作成
              r = await Scenario.run("InstanceCreate", method=POST, headers={"Content-Type": "application/json"}, instance_name=target_instance.instanceName)

              CHECK_INTERVAL = 1
              TIMEOUT = 360
              # インスタンス作成のScenarioの処理が完了したかをポーリング
              for i in range (TIMEOUT):

                  get_instance = await Scenario.run("GetInstance", method=GET, headers={"Content-Type": "application/json"}, gce_id=gce_id)
                  instance_info=json.loads(get_instance.body.decode("utf-8"))["instance_info"]
                  if not ("error" in instance_info):
                      break

                  await asyncio.sleep(CHECK_INTERVAL)

              # ATOMのインスタンスのgceIdを更新
              target_instance.gceId = instance_info["id"]
              await target_instance.save()

              return {
                  __identifier__: {
                      "instance_info": instance_info
                  },
                  "msg_type" : "recovery"
              }
          else:
              return{
                   __identifier__: "not worked"
              }

name: updateMachineType
code:

import os
async def updateMachineType(
      __instance__: str,
      __identifier__: str,
      __reverse__: bool,
      cause: str,
      instance_name: str,
      gce_id: str
      ) -> dict:

          if cause == "A mismatch exists between the GCE instance and the SDK instance data. Details:machineType":

              # SDK内のインスタンスを取得
              target_instances = await atom.VmInstance.retrieve(gceId=gce_id)
              target_instance = target_instances[0]

              # SDKとGCEのインスタンス情報でどこのfieldが異なるのか"cause"から確認
              target_field = cause.split(':')[1]

              if target_field == "machineType":
                  # SDKのインスタンス情報をもとにGCEインスタンスをアップデート
                  r = await Scenario.run("InstanceUpdate", method=PUT, headers={"Content-Type": "application/json"}, instance_name=target_instance.instanceName, machine_type=target_instance.machineType)

                  CHECK_INTERVAL = 5
                  TIMEOUT = 120
                  # インスタンス作成のScenarioの処理が完了したかをポーリング
                  for i in range (TIMEOUT):
                      get_instance = await Scenario.run("GetInstance", method=GET, headers={"Content-Type": "application/json"}, gce_id=gce_id)
                      instance_info=json.loads(get_instance.body.decode("utf-8"))["instance_info"]
                      instance_machine_type = os.path.basename(instance_info["machineType"])
                      if instance_machine_type == target_instance.machineType:
                          break

                      await asyncio.sleep(CHECK_INTERVAL)

                  return {
                      __identifier__: {
                          "instance_info": target_instance
                      },
                      "msg_type" : "recovery"
                  }
          else:
              return{
                   __identifier__: "not worked"
              }

name: updateATOMInstance
code:

async def updateATOMInstance(
      __instance__: str,
      __identifier__: str,
      __reverse__: bool,
      cause: str,
      instance_name: str,
      gce_id: str
      ) -> dict:

          if cause == "Failure in GCE system":

              # SDKのインスタンスを取得
              target_instances = await atom.VmInstance.retrieve(gceId=gce_id)
              target_instance = target_instances[0]

              # インスタンスのstatus情報を設定し直す
              target_instance.status = "GCE SYSTEM ERROR"
              await target_instance.save()

              return {
                   __identifier__: {
                    "instance_info": target_instance
                  },
                  "msg_type" : "recovery"
              }
          else:
              return{
                   __identifier__: "not worked"
              }

name: updateTicketInstance
code:

async def updateTicketInstance(
      __instance__: str,
      __identifier__: str,
      __reverse__: bool,
      ticket_id: int,
      ) -> dict:

          # 対象のTicketInstanceの取り出し、statusをResolveに更新
          ticketInstances = await atom.TicketInstance.retrieve(ticketId=ticket_id)
          ticketInstance = ticketInstances[0]
          ticketInstance.status = "Resolve"

          await ticketInstance.save()
          return {
              __identifier__: {
                "update_ticket": ticketInstance
              }
          }

以上でDAGのワークフローに必要なfunctionの作成が終わりました。

ワークフローの作成

ここからDAGのワークフローの実装に入ります。
本チュートリアルで実装するワークフローは以下になります(再掲)

まずは[Workflow Scenario as a Service]>[Dag(Resources)]に移動します。
以下の画面のようにCreate DAGをクリックします。

作成画面を開いたのち、以下の定義を設定し、DAGを作成します。

項目 入力値
category SampleDag
workspace 未入力
name RecoveryWork
autoReverseOnFailure 未選択

以下の画像のように新しく作成したDAGが左側の一覧に追加されます。

それではDAGへタスクの追加を行なっていきます。
左側のDAG一覧からRecoveryWorkをクリックし、中央の画面をRecoveryWorkのワークフロー作成画面に遷移させる。 ワークフローの作成画面をクリックすると以下のようにCreate New Taskのボタンが生成されるので、クリックし作成画面に遷移する。

作成画面を開いたのち、以下の定義を設定し、タスクを作成します。

  • Userへの障害通知
項目 入力値 詳細
identifier task_00(デフォルト) DAGタスクの識別子です。
label Userへの障害通知 当該タスクの注釈や処理概要を記載できるユーザラベルです。
operator FunctionPluginOperator(デフォルト) DAGタスクで実行する処理を提供するオペレータを定義します。
1. FunctionPluginOperator: 特定のFunctionプラグインへの参照
2. ModulePluginOperator: 特定のModuleプラグインに含まれる特定の関数への参照
3. ThirdPartyPluginOperator: 特定の持ち込みpythonパッケージ内の特定モジュールに含まれる特定の関数への参照
4. DagPluginOperator: 特定のDAGプラグインへの参照
funcname notificationToUser DAGフローがdownstream方向に稼働している場合に当該タスクが実行する関数を指定します。
approvalRequired 未選択(デフォルト) 当該タスクの実行に際してユーザからの承認行為が必要か否かを指定するフラグです。デフォルトはFalseです。
idempotency 未選択(デフォルト) 当該タスクの冪等性の有無を宣言するフラグです。デフォルトはFalseです。
dependsOn 未選択(デフォルト) DAGタスクの依存先タスクの識別子のリストを指定します。

タスクを作成すると以下のような画面になり、ワークフローの作成画面にタスクが追加されます。

同様に以下2つのタスクを追加します。

  • Operatorへの障害通知
項目 入力値
identifier task_01(デフォルト)
label Operatorへの障害通知と復旧作業確認
operator FunctionPluginOperator(デフォルト)
funcname notificationToOperator
approvalRequired 未選択(デフォルト)
idempotency 未選択(デフォルト)
dependsOn 未選択(デフォルト)
  • 障害管理のためのTicket作成
項目 入力値
identifier task_02(デフォルト)
label Ticket作成
operator FunctionPluginOperator(デフォルト)
funcname createTicketInstance
approvalRequired 未選択(デフォルト)
idempotency 未選択(デフォルト)
dependsOn 未選択(デフォルト)

上記3つのタスクを作成した後のワークフロー作成画面は以下のようになります。

これら3つのタスクには依存関係がないので、障害発生時に並列で行われます。

  • Operatorによる復旧承認待ち処理
    本チュートリアルのDAGでは障害の発生と復旧作業の通知を受けたOperatorが復旧作業を開始するための承認を行います
    その承認待ちを行うタスクを以下の定義から作成します
項目 入力値
identifier task_03(デフォルト)
label Operatorの承認待ち
operator FunctionPluginOperator(デフォルト)
funcname awaitApproval
approvalRequired 選択
idempotency 未選択(デフォルト)
dependsOn task_00, task_01, task_02

上記のタスクを作成すると、ワークフローの全体像は以下のようになります。

dependsOnのパラメータにタスクを設定したことで、タスク同士の依存関係が存在するワークフローになりました。
また、本タスクはapprovalRequiredを選択し、承認が必要なタスクのため、左上に承認待ちタスクのマークがついていることが確認できます。

次に対象の3つの障害に対する復旧作業のタスクを作成します。

  • インスタンスの作成
    本タスクの対象障害は「Compute Engineに指定のVMインスタンスが存在しない」場合となります。
    以下の定義からタスクを作成します。
項目 入力値
identifier task_04(デフォルト)
label 障害対応_インスタンスの作成
operator FunctionPluginOperator(デフォルト)
funcname createInstance
approvalRequired 未選択(デフォルト)
idempotency 未選択(デフォルト)
dependsOn task_03
  • ATOMインスタンスのstatus更新
    本タスクの対象障害は「Compute Engineのシステム自体に障害が発生し、VMインスタンス情報が取得できない」場合となります。
    以下の定義からタスクを作成します。
項目 入力値
identifier task_05(デフォルト)
label 障害対応_ATOMインスタンスの更新
operator FunctionPluginOperator(デフォルト)
funcname updateATOMInstance
approvalRequired 未選択(デフォルト)
idempotency 未選択(デフォルト)
dependsOn task_03
  • GCEインスタンスのmachineType更新
    本タスクの対象障害は「Compute Engineから取得したVMインスタンスのmachineTypeと、対応するATOMインスタンスのmachineTypeが異なる」場合となります。
    以下の定義からタスクを作成します。
項目 入力値
identifier task_06(デフォルト)
label 障害対応_GCEインスタンスのmachineTypeの更新
operator FunctionPluginOperator(デフォルト)
funcname updateMachineType
approvalRequired 未選択(デフォルト)
idempotency 未選択(デフォルト)
dependsOn task_03

上記3つの障害復旧作業のタスクを作成すると、ワークフローの全体像は以下のようになります。

3つの復旧作業がOperatorの承認後に実施されることがわかります。

  • Operatorへの復旧確認通知
    復旧作業が実施された後にOperatorへ復旧内容の確認を行うため通知を行うタスクを作成します。
    以下の定義からタスクを作成します。
項目 入力値
identifier task_07(デフォルト)
label Operatorへの復旧内容通知
operator FunctionPluginOperator(デフォルト)
funcname notificationToOperator
approvalRequired 未選択(デフォルト)
idempotency 未選択(デフォルト)
dependsOn task_04, task_05, task_06

上記のタスクを作成すると、ワークフローの全体像は以下のようになります。

3つの復旧作業とOperatorへの通知のタスクに依存関係があることがわかります。

  • Operatorによる復旧完了承認待ち
    復旧作業が完了した後に、復旧作業が完了したかを確認するためOperatorによる承認待ちのタスクを以下の定義より作成します。
項目 入力値
identifier task_08(デフォルト)
label Operatorによる承認待ち
operator FunctionPluginOperator(デフォルト)
funcname awaitApproval
approvalRequired 選択
idempotency 未選択(デフォルト)
dependsOn task_07

上記のタスクを作成すると、ワークフローの全体像は以下のようになります。

Operatorによる復旧内容の承認がされた後、システムの復旧をUserへ通知し、障害検知時に作成されたTicketのstatusを更新します。

  • Userへのシステム復旧通知
    以下の定義よりタスクを作成します。
項目 入力値
identifier task_09(デフォルト)
label Userへのシステム復旧通知
operator FunctionPluginOperator(デフォルト)
funcname notificationToUser
approvalRequired 未選択(デフォルト)
idempotency 未選択(デフォルト)
dependsOn task_08
  • Ticketの更新
    以下の定義よりタスクを作成します。
項目 入力値
identifier task_10(デフォルト)
label Ticketの更新
operator FunctionPluginOperator(デフォルト)
funcname updateTicketInstance
approvalRequired 未選択(デフォルト)
idempotency 未選択(デフォルト)
dependsOn task_08

以上2つのタスクを作成すると、ワークフローの全体像は以下のようになります。

以上でDAGのワークフローを作成することができました。

Testの実装

Test内容確認

ここまでで、ScenarioとDAGのワークフローを実装しました。
それらのシステムが正常に動作するのか、単体試験と結合試験を行います。
これから実装するTestの概要は以下になります。

単体試験

単体試験はAPI開発チュートリアル同様にUnitTestによって行います。
UnitTestの実装手順は以下になります。

  1. Fakerの作成
    • テスト実行時に、外部サービスとの実際の通信を行わずに、あらかじめ定義した疑似動作(フェイク関数)を実行して応答を模擬する仕組みです。
  2. Illusionの作成
    • Fakerで定義されたフェイク関数群を、テストシーンに適用するための設定・割り当ての仕組みです。これにより、特定のテスト実行時に外部環境の動作を模擬できます。
  3. TestCaseの作成
    • 一つのテストシナリオを構成する単位で、入力条件や期待する出力、検証(assert)処理などを記述し、個別の機能やプラグインの動作をテストします

上記のUnitTestをScenarioとDAGに実装することによって、各障害ケースにてScnarioとDAGが正常な処理ができるか確認します。

結合試験(実施自体には前提条件が必要だが、目を通すことを推奨)

結合試験では実際に障害を発生させ、Scenarioによる障害検知からDAGによる障害復旧までの一連の処理が正常に動作するかを確認します。

結合試験ではFakerやIllusionを使用しないため、以下ができることが前提条件となります。

  • Google CloudでComputeEngineの権限を持つサービスアカウントを作成している。
  • Google CloudのCompute Engine APIを用いてCompute EngineインスタンスをCRUDすることができる。
  • SlackのAppを作成し、WorkSpaceにインストールできる。

Caution

結合試験を実施する上での前提条件とはなりますが、DAGのワークフロー、taskの遷移の様子を確認できるので目を通すことをお勧めします。

Fakerの実装

実装内容確認

本チュートリアルでは、以下のFakerを用意します。

faker名 想定する用途
getAccessToken (API開発チュートリアルで実装済み)疑似的にGETしてGoogle Cloud APIのアクセストークンを取得した際の挙動をする
getInstanceFromGCE GCEのVMインスタンス情報を取得するAPIを模擬する
getInstanceByScenario GCEのVMインスタンス情報を取得するScenarioを模擬する
createDagInstance DAGインスタンスを作成するfunctionを模擬する
postSlack slackに対して通知を行う処理を模擬する
recoveryCreateInstance 障害の復旧作業として、GCEのVMインスタンス作成を模擬する
recoveryUpdateMachineType 障害の復旧作業として、GCEのVMインスタンスのMachineTypeの更新を模擬する

Fakerの作成

それではFakerを一つ一つ作成していきましょう。
Unit Test as a Service>Faker > Create New Faker からFakerを作成することができます。
Fakerの作成方法の詳細はAPI開発チュートリアルを参考にしてください。
本チュートリアルでは以下のFakerを作成してください。

  • faker①
    name: getAccessToken (API開発チュートリアルで実装済みのため追加実装不要) category: SampleCrudApi
    fakes:
async def Success(*args, **kwargs):
    return FakeHttpResponse(
        200,
        body=json.dumps({"access_token": "xxxxx"}).encode("utf-8")
        )
async def InternalError(*args, **kwargs):
    return FakeHttpResponse(500)
async def NotFound(*args, **kwargs):
    return FakeHttpResponse(404)
  • faker②
    name: getInstanceFromGCE
    category: SampleDag
    fakes:
async def Success(*args, **kwargs):
    res_body = {
            "id": "111"
    }
    return FakeHttpResponse(
        200,
        body=json.dumps(res_body).encode("utf-8")
    )
  • faker③
    name: getInstanceByScenario
    category: SampleDag
    fakes:
async def Success(*args, **kwargs):
    res_body = {
        "instance_info": {
            "id": "222",
            "machineType": "https://www.googleapis.com/compute/v1/projects/<project_id>/zones/<zone>/machineTypes/e2-medium"
        }
    }
    return FakeHttpResponse(
        200,
        body=json.dumps(res_body).encode("utf-8")
    )
async def InstanceNotFound(*args, **kwargs):
    res_body = {
        "instance_info": {
            "error": {
                "code": 404
            }
        }
    }
    return FakeHttpResponse(
        200,
        body=json.dumps(res_body).encode("utf-8")
    )
async def GCEFailure(*args, **kwargs):
    return FakeHttpResponse(
        599
    )
async def MachineTypeMismatch(*args, **kwargs):
    res_body = {
        "instance_info": {
            "machineType": "https://www.googleapis.com/compute/v1/projects/<project_id>/zones/<zone>/machineTypes/e2-small"
        }
    }
    return FakeHttpResponse(
        200,
        body=json.dumps(res_body).encode("utf-8")
    )
  • faker④
    name: createDagInstance
    category: SampleDag
    fakes:
async def Success(*args, **kwargs):
    return "111"
  • faker⑤
    name: postSlack
    category: SampleDag
    fakes:
async def Success(*args, **kwargs):
    return FakeHttpResponse(200)
  • faker⑥
    name: recoveryCreateInstance
    category: SampleDag
    fakes:
async def Success(*args, **kwargs):
    return FakeHttpResponse(200)
  • faker⑦
    name: recoveryUpdateMachineType
    category: SampleDag
    fakes:
async def Success(*args, **kwargs):
    return FakeHttpResponse(200)

Fakerの挿入

今回用意したFakerの挿入箇所をまとめておきます。
以下を参考に、それぞれのScenarioに対してFakerを挿入してください。

Scenario: GetInstance

  • Toyblock①: get access token
    faker: getAccessToken
    挿入するcallout関数:
faker("getAccessToken")
r = await callout(
    # API開発チュートリアルで作成したアクセストークンの取得APIを使用
    path="/sample_crud_api/get_access_token",
    method="GET"
    )
  • Toyblock②: get instance from GCE
    faker: getInstanceFromGCE
    挿入するcallout関数:
faker("getInstanceFromGCE")
r = await callout(
    path=f"/SampleDag/{get_instance.projectId}/{get_instance.zone}/{get_instance.instanceName}",
    method="GET",
    headers={
        "Authorization": f"Bearer {access_token}"
    }
    )

Scenario: DetectSystemFailure

  • Toyblock①: get instance from GCE
    faker: getInstanceByScenario
    挿入するcallout関数:
# GCEのインスタンス情報を取得
faker("getInstanceByScenario")
r = await callout(
    path=f"/sample_dag/instances/{get_instance.gceId}",
    method="GET"
    )
response = r
  • Toyblock②: start dag workflow
    faker: createDagInstance
    挿入するcallout関数:
# DAGインスタンスの作成・ワークフロー開始
    faker("createDagInstance")
    dag_Instance_Id = await context.createDagInstance(dagName="RecoveryWork", gce_id=gce_id, cause=cause, user_name="user", operator_name="operator", msg_type="error", resolution=resolution)

以上でScenarioへの挿入は完了です。
次に以下を参考にして、それぞれのFunctionに対してFakerを挿入してください。

Function①: notificationToUser

  • faker: postSlack
    挿入するcallout関数:
faker("postSlack")
r = await callout(
  url="https://slack.com/api/chat.postMessage",
  headers={"Authorization": f"Bearer {token}", "Content-Type": "application/json"},
  method=POST,
  body=params
  )

Function②: notificationToOperator

  • faker: postSlack
    挿入するcallout関数:
# slack通知(tokenには自身が作成したOAuth Tokensを設定)
faker("postSlack")
r = await callout(
  url="https://slack.com/api/chat.postMessage",
  headers={"Authorization": f"Bearer {token}", "Content-Type": "application/json"},
  method=POST,
  body=params
  )

Function③: createInstance

  • faker: recoveryCreateInstance
    挿入するcallout関数:
faker("recoveryCreateInstance")
r = await Scenario.run("InstanceCreate", method=POST, headers={"Content-Type": "application/json"}, instance_name=target_instance.instanceName)
  • faker: getInstanceByScenario
    挿入するcallout関数:
faker("getInstanceByScenario")
get_instance = await Scenario.run("GetInstance", method=GET, headers={"Content-Type": "application/json"}, gce_id=gce_id)

Function④: updateMachineType

  • faker: recoveryUpdateMachineType
    挿入するcallout関数:
faker("recoveryUpdateMachineType")
r = await Scenario.run("InstanceUpdate", method=PUT, headers={"Content-Type": "application/json"}, instance_name=target_instance.instanceName, machine_type=target_instance.machineType)
  • faker: getInstanceByScenario
    挿入するcallout関数:
faker("getInstanceByScenario")
get_instance = await Scenario.run("GetInstance", method=GET, headers={"Content-Type": "application/json"}, gce_id=gce_id)

以上でFakerの実装が完了です。

Illusionの実装

実装内容確認

本チュートリアルでは、以下のIllusionを用意します。

Illusion名 想定する場合
GetInstanceSuccess ScenarioによるGCEのインスタンス情報の取得が成功する場合
DetectNoSystemFailure Scenarioによる障害検知で障害が検知されない場合
DetectInstanceNotFound Scenarioによる障害検知でGCEに指定のインスタンスが存在しない障害を検知した場合
DetectGCEFailure Scenarioによる障害検知でGCEのシステム自体の障害を検知した場合
DetectMachineTypeMismatch Scenarioによる障害検知でGCEとSDKでMachineTypeの情報が一致しない障害を検知した場合
RecoveryInstanceNotFound DAGによるGCEに指定のインスタンスが存在しない障害へ復旧対応する場合
RecoveryGCEFailure DAGによるGCEのシステム自体の障害へ復旧対応する場合
RecoveryMachineTypeMismatch DAGによるGCEとSDKでMachineTypeの情報が一致しない障害へ復旧対応する場合

Illusionの作成

それではIllusionを一つ一つ作成していきましょう。 [Unit Test as a Service]>[Illusion]>[Create New Illusion] からIllusionを新規作成することができます。
Illusionの詳細な作成方法はAPI開発チュートリアルを参考にしてください 本チュートリアルでは以下のIllusionを作成してください。

name category fakes
GetInstanceSuccess SampleDag getAccessToken - Success
getInstanceFromGCE - Success
DetectNoSystemFailure SampleDag getInstanceByScenario - Success
DetectInstanceNotFound SampleDag getInstanceByScenario - InstanceNotFound
createDagInstance - Success
DetectGCEFailure SampleDag getInstanceByScenario - GCEFailure
createDagInstance - Success
DetectMachineTypeMismatch SampleDag getInstanceByScenario - MachineTypeMismatch
createDagInstance - Success
RecoveryInstanceNotFound SampleDag postSlack - Success
recoveryCreateInstance - Success
getInstanceByScenario - Success
RecoveryGCEFailure SampleDag postSlack - Success
RecoveryMachineTypeMismatch SampleDag postSlack - Success
getInstanceByScenario - Success
recoveryUpdateMachineType - Success

以上でIllusionの実装が完了です。

Testcaseの実装

実装内容確認

本チュートリアルでは、以下のTestcaseを用意します。

Test Case 対象Service 適用するIllusion 想定するケース
GetInstanceSuccess Scenario GetInstanceSuccess GCEへのインスタンス情報取得APIが成功するケース
DetectNoSystemFailure Scenario DetectNoSystemFailure 障害検知APIで障害が検知されないケース
DetectInstanceNotFound Scenario DetectInstanceNotFound 障害検知APIでGCE上にVMインスタンスが確認できない障害を検知するケース
DetectGCEFailure Scenario DetectGCEFailure 障害検知APIでGCEのシステム自体の障害を検知するケース
DetectMachineTypeMismatch Scenario DetectMachineTypeMismatch 障害検知APIでGCEとSDKでMachineTypeの情報が一致しない障害を検知するケース
RecoveryInstanceNotFound DAG RecoveryInstanceNotFound DAGによるGCEに指定のインスタンスが存在しない障害へ復旧対応する場合
RecoveryGCEFailure DAG RecoveryGCEFailure DAGによるGCEのシステム自体の障害へ復旧対応する場合
RecoveryMachineTypeMismatch DAG RecoveryMachineTypeMismatch DAGによるGCEとSDKでMachineTypeの情報が一致しない障害へ復旧対応する場合

Test Caseの作成

それでは一つずつTestcaseを作成していきます。
[Unit Test as a Service]>[Test Case]>[Create New Testcase] からTestcaseを作成することができます。
ScenarioとDAGではTestcaseの作成方法が異なります。
Scenarioに対するTestcaseの詳細な作成方法はAPI開発チュートリアルを参考にしてください。

まずはScenarioに対するTestcaseを以下の定義を参考に5つ作成してください。

Testcase①

Test Target Type: Scenario
name: GetInstanceSuccess
category: SampleDag
target: GetInstance
illusion: GetInstanceSuccess
Preparation:

async def preparation():
    # テスト用のVmInstanceクラスのインスタンスを作成する
    test_instance = await atom.VmInstance(
        instanceName="test-instance",
        gceId="111",
        machineType="e2-medium"
        )
    # データベースに保存する
    await test_instance.save()

Assert Begin:

async def hoge(*args, **kwargs):
    pass

Scenario Input

  • method: GET
  • path: /sample_dag/instances/111
  • headers:
{
    "Content-Type": "application/json"
}

Scenario Input(script)

  • get access token: -
  • get instance from SDK: -
  • get instance from GCE: -

Scenario Output:

async def assertion():
    assert Response.code == 200, "Invalid response code %r" % Response.code

Scenario Output(script)

  • get access token: -
  • get instance from SDK: -
  • get instance from GCE: -

Assert End:

async def assertion():
    gce_id = json.loads(Response.body.decode("utf-8"))["instance_info"]["id"]

    # fakerからインスタンスIDの情報を取得できているか確認
    assert gce_id == "111", "%r" % Response.body.decode("utf-8")

Cleanup:

async def cleanup():
    # テスト用のInstanceを削除する
    test_instances = await atom.VmInstance.retrieve(gceId="111")
    test_instance = test_instances[0]
    await test_instance.destroy()

    # テスト用のInstanceの削除を確認する
    test_instances = await atom.VmInstance.retrieve(gceId="111")
    assert test_instances == [], "Test instance exists"

Testcase②

Test Target Type: Scenario
name: DetectNoSystemFailure
category: SampleDag
target: DetectSystemFailure
illusion: DetectNoSystemFailure
Preparation:

async def preparation():
    # テスト用のVmInstanceクラスのインスタンスを作成する
    test_instance = await atom.VmInstance(
        instanceName="test-instance",
        gceId="111",
        machineType="e2-medium"
        )
    # データベースに保存する
    await test_instance.save()

Assert Begin:

async def hoge(*args, **kwargs):
    pass

Scenario Input

  • method: GET
  • path: /sample_dag/instances/111/detect
  • headers:
{
    "Content-Type": "application/json"
}

Scenario Input(script)

  • get instance from SDK: -
  • get instance from GCE: -
  • detect failure: -
  • start dag workflow: -

Scenario Output:

async def assertion():
    assert Response.code == 200, "Invalid response code %r" % Response.code

Scenario Output(script)

  • get instance from SDK: -
  • get instance from GCE: -
  • detect failure: -
  • start dag workflow: -

Assert End:

async def assertion():
    cause = json.loads(Response.body.decode("utf-8"))["detect_result"]["cause"]
    # causeに障害が検知出来ていない旨(null)が設定されているか確認
    assert cause == "null", "%r" % Response.body.decode("utf-8")

Cleanup:

async def cleanup():
    # テスト用のInstanceを削除する
    test_instances = await atom.VmInstance.retrieve(gceId="111")
    test_instance = test_instances[0]
    await test_instance.destroy()

    # テスト用のInstanceの削除を確認する
    test_instances = await atom.VmInstance.retrieve(gceId="111")
    assert test_instances == [], "Test instance exists"

Testcase③

Test Target Type: Scenario
name: DetectInstanceNotFound
category: SampleDag
target: DetectSystemFailure
illusion: DetectInstanceNotFound
Preparation:

async def preparation():
    # テスト用のVmInstanceクラスのインスタンスを作成する
    test_instance = await atom.VmInstance(
        instanceName="test-instance",
        gceId="111",
        machineType="e2-medium"
        )
    # データベースに保存する
    await test_instance.save()

Assert Begin:

async def hoge(*args, **kwargs):
    pass

Scenario Input

  • method: GET
  • path: /sample_dag/instances/111/detect
  • headers:
{
    "Content-Type": "application/json"
}

Scenario Input(script)

  • get instance from SDK: -
  • get instance from GCE: -
  • detect failure: -
  • start dag workflow: -

Scenario Output:

async def assertion():
    assert Response.code == 200, "Invalid response code %r" % Response.code

Scenario Output(script)

  • get instance from SDK: -
  • get instance from GCE: -
  • detect failure: -
  • start dag workflow: -

Assert End:

async def assertion():
    cause = json.loads(Response.body.decode("utf-8"))["detect_result"]["cause"]
    dag_instance_id = json.loads(Response.body.decode("utf-8"))["detect_result"]["dag_instance_id"]
    # 障害理由と作成されたDAGのinstanceIdを確認
    assert cause == "Instance not found", "%r" % Response.body.decode("utf-8")
    assert dag_instance_id == "111", "%r" % Response.body.decode("utf-8")

Cleanup:

async def cleanup():
    # テスト用のInstanceを削除する
    test_instances = await atom.VmInstance.retrieve(gceId="111")
    test_instance = test_instances[0]
    await test_instance.destroy()

    # テスト用のInstanceの削除を確認する
    test_instances = await atom.VmInstance.retrieve(gceId="111")
    assert test_instances == [], "Test instance exists"

Testcase④

Test Target Type: Scenario
name: DetectGCEFailure
category: SampleDag
target: DetectSystemFailure
illusion: DetectGCEFailure
Preparation:

async def preparation():
    # テスト用のVmInstanceクラスのインスタンスを作成する
    test_instance = await atom.VmInstance(
        instanceName="test-instance",
        gceId="111",
        machineType="e2-medium"
        )
    # データベースに保存する
    await test_instance.save()

Assert Begin:

async def hoge(*args, **kwargs):
    pass

Scenario Input

  • method: GET
  • path: /sample_dag/instances/111/detect
  • headers:
{
    "Content-Type": "application/json"
}

Scenario Input(script)

  • get instance from SDK: -
  • get instance from GCE: -
  • detect failure: -
  • start dag workflow: -

Scenario Output:

async def assertion():
    assert Response.code == 200, "Invalid response code %r" % Response.code

Scenario Output(script)

  • get instance from SDK: -
  • get instance from GCE: -
  • detect failure: -
  • start dag workflow: -

Assert End:

async def assertion():
    cause = json.loads(Response.body.decode("utf-8"))["detect_result"]["cause"]
    dag_instance_id = json.loads(Response.body.decode("utf-8"))["detect_result"]["dag_instance_id"]
    # 障害理由と作成されたDAGのinstanceIdを確認
    assert cause == "Failure in GCE system", "%r" % Response.body.decode("utf-8")
    assert dag_instance_id == "111", "%r" % Response.body.decode("utf-8")

Cleanup:

async def cleanup():
    # テスト用のInstanceを削除する
    test_instances = await atom.VmInstance.retrieve(gceId="111")
    test_instance = test_instances[0]
    await test_instance.destroy()

    # テスト用のInstanceの削除を確認する
    test_instances = await atom.VmInstance.retrieve(gceId="111")
    assert test_instances == [], "Test instance exists"

Testcase⑤

Test Target Type: Scenario
name: DetectMachineTypeMismatch
category: SampleDag
target: DetectSystemFailure
illusion: DetectMachineTypeMismatch
Preparation:

async def preparation():
    # テスト用のVmInstanceクラスのインスタンスを作成する
    test_instance = await atom.VmInstance(
        instanceName="test-instance",
        gceId="111",
        machineType="e2-medium"
        )
    # データベースに保存する
    await test_instance.save()

Assert Begin:

async def hoge(*args, **kwargs):
    pass

Scenario Input

  • method: GET
  • path: /sample_dag/instances/111/detect
  • headers:
{
    "Content-Type": "application/json"
}

Scenario Input(script)

  • get instance from SDK: -
  • get instance from GCE: -
  • detect failure: -
  • start dag workflow: -

Scenario Output:

async def assertion():
    assert Response.code == 200, "Invalid response code %r" % Response.code

Scenario Output(script)

  • get instance from SDK: -
  • get instance from GCE: -
  • detect failure: -
  • start dag workflow: -

Assert End:

async def assertion():
    cause = json.loads(Response.body.decode("utf-8"))["detect_result"]["cause"]
    dag_instance_id = json.loads(Response.body.decode("utf-8"))["detect_result"]["dag_instance_id"]
    # 障害理由と作成されたDAGのinstanceIdを確認
    assert cause == "A mismatch exists between the GCE instance and the SDK instance data. Details:machineType", "%r" % Response.body.decode("utf-8")
    assert dag_instance_id == "111", "%r" % Response.body.decode("utf-8")

Cleanup:

async def cleanup():
    # テスト用のInstanceを削除する
    test_instances = await atom.VmInstance.retrieve(gceId="111")
    test_instance = test_instances[0]
    await test_instance.destroy()

    # テスト用のInstanceの削除を確認する
    test_instances = await atom.VmInstance.retrieve(gceId="111")
    assert test_instances == [], "Test instance exists"

以上でScenarioに対するTestcaseの実装は完了です。

次にDAGのワークフローに対するTestcaseの実装に入ります。

Scenario同様に[Unit Test as a Service]>[Test Case]>[Create New Testcase] からTestcaseを作成することができます。 以下の画面でTest Target TypeDagを設定してください。

次にTestcaseのnamecategorytargetを入力します。 targetには、今回テストしたいDAGを選択してください。

まずはGCEにVMインスタンスが存在しない場合のTestcaseを作成するので、以下を入力してください。
name: RecoveryInstanceNotFound
caterogory: SampleDag
target: RecoveryWork

次にillusionの設定ですが、scenarioのTestcase同様にTestcase Settingから以下を設定します。
illusion:RecoveryInstanceNotFound

次にmetadataの設定です。

Testcase Settingタブのmetadataに以下を設定します。 ここで設定したmetadataはDAGのテストの際に初期のmetadataとしてDAGのインスタンスに受け渡されます。

{
    "instance_name": "test-instance",
    "cause": "Instance not found",
    "user_name": "user",
    "operator_name": "operator",
    "msg_type": "error",
    "resolution": "Create the GCE instance",
    "gce_id": "111",
    "dag_instance_id": "1"
}

次にFlow Settingタブを開いてください。

Note

それぞれの項目の説明は以下になります。

  • Preparation(準備)
    テスト実行前の初期化や環境設定を行います。たとえば、テスト対象のデータやモックのセットアップを行います。

  • Assertion(テスト結果への検証)
    DAGのテスト実行後の応答結果や状態変化を確認・記録します。出力内容が期待通りかをチェックするための処理です。

  • Cleanup(後処理)
    テスト実行後に環境を元に戻すための処理です。たとえば、作成したテストデータの削除やリソースの解放などを行います。

また、PreparationAssertionの中間のプラスボタンをクリックするとFlow Settingの画面は以下のようになります。

新たに出現した設定項目ではDAGのワークフローへの中間処理(Taskの承認Assertion等)を設定することができます。

Note

それぞれの項目の説明は以下になります。

  • 対象Task
    中間処理を実施するトリガーは対象タスクが特定のStatusに遷移した時のため、本項目ではトリガーとなるTaskを指定できます。

  • TaskのStatus
    本項目では中間処理を実行するためのトリガーとなるTaskのStatusを設定できます。 DAGのTaskにはStatusが存在し、ワークフローが実行される中で複数の状態に遷移します(プログラミングガイド参照)。

  • interaction
    中間処理を設定できます。 中間処理ではワークフローの途中結果の検証や、承認待ちTaskへの承認などができます。

今回のTestcaseではFlow Settingを以下のように設定します。

Preparation:

async def preparation():
    gce_id = "111"
    instance_name = "test-instance"
    machineType = "e2-medium"
    # テスト用のVmInstanceクラスのインスタンスを作成する
    test_instance = await atom.VmInstance(
        instanceName=instance_name,
        gceId=gce_id,
        machineType=machineType
        )
    # データベースに保存する
    await test_instance.save()

Assertion for Tasks

  • target: task03
    status: ApprovalPending
    interaction:
async def interaction():
    await dagInstance.approval(["task_03"], name="operator", withRun=True, _illusion = "RecoveryInstanceNotFound")
  • target: task08
    status: ApprovalPending
    interaction:
async def interaction():
    await dagInstance.approval(["task_08"], name="operator", withRun=True, _illusion = "RecoveryInstanceNotFound")

Note

task03task08はoperatorによる承認待ちタスクのため、StatusがApprovalPending(承認待ち)の時、承認を実行する。 引数のwithRunはTaskの承認と同時にワークフローの再開を行う場合にTrueに設定する。

Assertion:

async def assertion():
    # DAGインスタンスのワークフローが実装されたか確認
    assert dagInstance.status == "Done", "%r" % dagInstance.status

    # ATOMインスタンスのgce_idが更新されているか確認
    target_instances = await atom.VmInstance.retrieve(gceId="222")
    target_instance = target_instances[0]
    assert target_instance, "%r" % target_instance

    # Ticketインスタンスのstatusが"resolve"になっているか確認
    ticket_instances = await atom.TicketInstance.retrieve(ticketId=dagInstance.metadata["ticket_id"])
    ticket_instance = ticket_instances[0]
    assert ticket_instance.status == "Resolve", "%r" % ticket_instance

Cleanup:

async def cleanup():
    # テスト用のInstanceを削除する
    test_instances = await atom.VmInstance.retrieve(gceId="222")
    test_instance = test_instances[0]
    await test_instance.destroy()

    # テスト用のInstanceの削除を確認する
    test_instances = await atom.VmInstance.retrieve(gceId="222")
    assert test_instances == [], "Test instance exists"

    # テスト用のTicketInstanceを削除する
    ticket_instances = await atom.TicketInstance.retrieve(ticketId=dagInstance.metadata["ticket_id"])
    ticket_instance = ticket_instances[0]
    await ticket_instance.destroy()

    # テスト用のTicketInstanceの削除を確認する
    ticket_instances = await atom.TicketInstance.retrieve(ticketId=dagInstance.metadata["ticket_id"])
    assert ticket_instances == [], "Ticket instance exists"

    # テスト用のDAGInstanceを削除する
    await dagInstance.destroy()
    dag_instance = await DAGInstance.load(dagInstance.instance, noException=True)
    assert dag_instance is None, "Dag instance exists"

残り2つのTestcaseも以下の設定を参考に作成してください。

Testcase②
name: RecoveryGCEFailure
caterogory:SampleDag
target: RecoveryWork
illusion:RecoveryGCEFailure
metadata

{
    "instance_name": "test-instance",
    "cause": "Failure in GCE system",
    "user_name": "user",
    "operator_name": "operator",
    "msg_type": "error",
    "resolution": "Change status of VmInstance to GCE SYSTEM ERROR",
    "gce_id": "111",
    "dag_instance_id": "1"
}

Preparation:

async def preparation():
    gce_id = "111"
    instance_name = "test-instance"
    machineType = "e2-medium"
    # テスト用のVmInstanceクラスのインスタンスを作成する
    test_instance = await atom.VmInstance(
        instanceName=instance_name,
        gceId=gce_id,
        machineType=machineType
        )
    # データベースに保存する
    await test_instance.save()

Assertion for Tasks

  • target: task03
    status: ApprovalPending
    interaction:
async def interaction():
    await dagInstance.approval(["task_03"], name="operator", withRun=True, _illusion = "RecoveryGCEFailure")
  • target: task08
    status: ApprovalPending
    interaction:
async def interaction():
    await dagInstance.approval(["task_08"], name="operator", withRun=True, _illusion = "RecoveryGCEFailure")

Assertion:

async def assertion():
    # DAGインスタンスのワークフローが実装されたか確認
    assert dagInstance.status == "Done", "%r" % dagInstance.status

    # ATOMインスタンスのstatusが更新されているか確認
    target_instances = await atom.VmInstance.retrieve(gceId="111")
    target_instance = target_instances[0]
    assert target_instance.status == "GCE SYSTEM ERROR", "%r" % target_instance

    # Ticketインスタンスのstatusが"resolve"になっているか確認
    ticket_instances = await atom.TicketInstance.retrieve(ticketId=dagInstance.metadata["ticket_id"])
    ticket_instance = ticket_instances[0]
    assert ticket_instance.status == "Resolve", "%r" % ticket_instance

Cleanup:

async def cleanup():
    # テスト用のInstanceを削除する
    test_instances = await atom.VmInstance.retrieve(gceId="111")
    test_instance = test_instances[0]
    await test_instance.destroy()

    # テスト用のInstanceの削除を確認する
    test_instances = await atom.VmInstance.retrieve(gceId="111")
    assert test_instances == [], "Test instance exists"

    # テスト用のTicketInstanceを削除する
    ticket_instances = await atom.TicketInstance.retrieve(ticketId=dagInstance.metadata["ticket_id"])
    ticket_instance = ticket_instances[0]
    await ticket_instance.destroy()

    # テスト用のTicketInstanceの削除を確認する
    ticket_instances = await atom.TicketInstance.retrieve(ticketId=dagInstance.metadata["ticket_id"])
    assert ticket_instances == [], "Ticket instance exists"

    # テスト用のDAGInstanceを削除する
    await dagInstance.destroy()
    dag_instance = await DAGInstance.load(dagInstance.instance, noException=True)
    assert dag_instance is None, "Dag instance exists"

Testcase③
name: RecoveryMachineTypeMismatch
caterogory:SampleDag
target: RecoveryWork
illusion:RecoveryMachineTypeMismatch
metadata

{
    "instance_name": "test-instance",
    "cause": "A mismatch exists between the GCE instance and the SDK instance data. Details:machineType",
    "user_name": "user",
    "operator_name": "operator",
    "msg_type": "error",
    "resolution": "Update machineType of GCE instance",
    "gce_id": "111",
    "dag_instance_id": "1"
}

Preparation:

async def preparation():
    gce_id = "111"
    instance_name = "test-instance"
    machineType = "e2-medium"
    # テスト用のVmInstanceクラスのインスタンスを作成する
    test_instance = await atom.VmInstance(
        instanceName=instance_name,
        gceId=gce_id,
        machineType=machineType
        )
    # データベースに保存する
    await test_instance.save()

Assertion for Tasks

  • target: task03
    status: ApprovalPending
    interaction:
async def interaction():
    await dagInstance.approval(["task_03"], name="operator", withRun=True, _illusion = "RecoveryMachineTypeMismatch")
  • target: task08
    status: ApprovalPending
    interaction:
async def interaction():
    await dagInstance.approval(["task_08"], name="operator", withRun=True, _illusion = "RecoveryMachineTypeMismatch")

Assertion:

async def assertion():
    # DAGインスタンスのワークフローが実装されたか確認
    assert dagInstance.status == "Done", "%r" % dagInstance.status

    # Ticketインスタンスのstatusが"resolve"になっているか確認
    ticket_instances = await atom.TicketInstance.retrieve(ticketId=dagInstance.metadata["ticket_id"])
    ticket_instance = ticket_instances[0]
    assert ticket_instance.cause == "A mismatch exists between the GCE instance and the SDK instance data. Details:machineType", "%r" % ticket_instance
    assert ticket_instance.status == "Resolve", "%r" % ticket_instance

Cleanup:

async def cleanup():
    # テスト用のInstanceを削除する
    test_instances = await atom.VmInstance.retrieve(gceId="111")
    test_instance = test_instances[0]
    await test_instance.destroy()

    # テスト用のInstanceの削除を確認する
    test_instances = await atom.VmInstance.retrieve(gceId="111")
    assert test_instances == [], "Test instance exists"

    # テスト用のTicketInstanceを削除する
    ticket_instances = await atom.TicketInstance.retrieve(ticketId=dagInstance.metadata["ticket_id"])
    ticket_instance = ticket_instances[0]
    await ticket_instance.destroy()

    # テスト用のTicketInstanceの削除を確認する
    ticket_instances = await atom.TicketInstance.retrieve(ticketId=dagInstance.metadata["ticket_id"])
    assert ticket_instances == [], "Ticket instance exists"

    # テスト用のDAGInstanceを削除する
    await dagInstance.destroy()
    dag_instance = await DAGInstance.load(dagInstance.instance, noException=True)
    assert dag_instance is None, "Dag instance exists"

以上でScenarioとDAGに対するTestcaseの実装が完了です。

UnitTest(単体試験)

最後に作成したテストを実行してみます。
[Unit Test as a Service]>[Unit Test]を開きます。
UnitTestの詳細な使用方法の説明はAPI開発チュートリアルに記載のため省略します。
今回作成した以下8つのTestcaseを実行し、Test進捗が100%になることを確認してください。

Test Case 対象Service 想定するケース
GetInstanceSuccess Scenario GCEへのインスタンス情報取得APIが成功するケース
DetectNoSystemFailure Scenario 障害検知APIで障害が検知されないケース
DetectInstanceNotFound Scenario 障害検知APIでGCE上にVMインスタンスが確認できない障害を検知するケース
DetectGCEFailure Scenario 障害検知APIでGCEのシステム自体の障害を検知するケース
DetectMachineTypeMismatch Scenario 障害検知APIでGCEとSDKでMachineTypeの情報が一致しない障害を検知するケース
RecoveryInstanceNotFound DAG DAGによるGCEに指定のインスタンスが存在しない障害へ復旧対応する場合
RecoveryGCEFilure DAG DAGによるGCEのシステム自体の障害へ復旧対応する場合
RecoveryMachineTypeMismatch DAG DAGによるGCEとSDKでMachineTypeの情報が一致しない障害へ復旧対応する場合

以上で単体試験が終了になります。
結合試験が実施できない方は以上でDAGのチュートリアルは修了になりますが、結合試験ではDAGのワークフローやTaskの状態遷移の様子を理解することができるため、目を通すことを推奨しています。

結合試験

結合試験では実際に障害を発生させ、Scenarioによる障害検知からDAGによる障害復旧までの一連の処理が正常に動作するかを確認します。
再掲になりますが、結合試験ではFakerやIllusionを使用しないため、以下ができることが前提条件となります。

  • Google CloudでComputeEngineの権限を持つサービスアカウントを作成している。
  • Google CloudのCompute Engine APIを用いてCompute EngineインスタンスをCRUDすることができる。
  • SlackのAppを作成し、WorkSpaceにインストールできる。

GCEにVMインスタンスがない場合

まずはSDK内のATOMインスタンスにはVMインスタンス情報があるが、GCE上には存在しない場合の結合試験を行います。

試験準備

まず実際に障害を発生させるための準備を行います。
準備としてはSDK内にのみVMインスタンス情報を作成します。

[Workflow Scenario as a Service]>[Scenario]を開いてください。 Interactive Shellから以下のコードを入力してください。

# GCEのidを111とするATOMインスタンスを作成(instanceNameはまだGCEに作成されていない名前であればOK)
>>> test_instance = await atom.VmInstance(instanceName="test-instance",gceId="111")↵
... ↵
↵
# 作成したインスタンス情報を確認
>>> print(test_instance)↵
... ↵
↵
VmInstance(instance='Vm1JbnN0YW5jZTo3ZGYxMjM3YTczNjUxMWYwOGFmNzM2ZTgwZTVmMjdlYg==', xid=None, xname=None, instanceId=293, instanceName='test-instance', projectId='************', zone='************', machineType='e2-medium', sourceImage=None, network=None, status=None, gceId='111')

# インスタンスを保存
>>> await test_instance.save()↵
>>> ↵
↵

# GCEのidをkeyにインスタンスを検索
>>> test_instances = await atom.VmInstance.retrieve(gceId="111")↵
... ↵
↵
>>> print(test_instances)↵
... ↵
↵
# 取得できたインスタンスの個数が1つであればOK
[VmInstance(instance='Vm1JbnN0YW5jZTo3ZGYxMjM3YTczNjUxMWYwOGFmNzM2ZTgwZTVmMjdlYg==', xid=None, xname=None, instanceId=293, instanceName='test-instance', projectId='************', zone='************', machineType='e2-medium', sourceImage=None, network=None, status=None, gceId='111')]

# instanceNameをkeyにインスタンス検索
>>> test_instances = await atom.VmInstance.retrieve(instanceName="test-instance")↵
... ↵
↵
# 取得できたインスタンスの個数がgceId='111'の1つのみであればOK
[VmInstance(instance='Vm1JbnN0YW5jZTo4MTY1ODc2ZTljZGExMWYwYTliZTI2NDY3MTA0YWNiNg==', xid=None, xname=None, instanceId=293, instanceName='test-instance', projectId='************', zone='************', machineType='e2-medium', sourceImage=None, network=None, status=None, gceId='111')]

対象のインスタンスが複数存在する場合には、プログラミングガイド>Scenario>ATOMを参考にインスタンスを削除してください。
以上により、SDK内にはインスタンス情報が存在するが、GCE上にはVMが存在していないというシチュエーションを再現できました。

結合試験

それでは障害検知のScenarioをTry API Callから呼び出しましょう。 url resourceにはSDK内に作成したインスタンスのgce_idである111を設定してください。

APIを叩くと、障害を検知したScenarioがDAGのワークフローを実行します。
ワークフローではまずoperatorとuserへ通知を行うので、Slack通知を受信したか確認しましょう。

User向けのチャンネルでは以下のようなメッセージが送信されます。

Operator向けのチャンネルでは以下のようなメッセージが送信されます。

現在DAGのワークフローは以下のようになっています。

Operatorによる承認待ちをしている状態なので、通知を受け取ったOperatorはDAGのインスタンスを確認します。
通知に記載のDAGInstanceIdをコピーし、[Workflow Scenario as a Service]>[DAG]を開きます。(インスタンス情報を確認する場合はControl PanelのDAGになります。)

左上の検索窓にコピーしたDAGInstanceIdを入力し検索すると以下の画像のようになります。

Note

画面の説明をします。

  • 赤枠:DAGインスタンスの各Taskのstatusやワークフローの進行状況をグラフィカルに確認することができます。

  • 緑枠:各Taskの状態遷移を時系列で確認できます。

  • 青枠:Taskの状態遷移と色の関係を確認できます。

DAGのワークフローの3つの並列TaskはDoneになっていることが確認できます。 また、現在はOperatorの承認待ちTaskがApprovalPendingになっていることを確認できます。 もし、Taskが失敗した場合、以下のようになります。

次にSlack通知の他にシステム障害のTicketが作成されているはずなので確認してみましょう。
画面右上の再生マークをクリックしてください。
すると、以下のように現在のmetadataが確認できます。(一度画面を更新する必要があるかもしれません)

metadataの項目にticket_idが記載されているため、コピーしてください。
次にInteractive Shellにて以下のコードを入力してください。

# ticket_idでTicket検索
>>> tickets = await atom.TicketInstance.retrieve(ticketId="175")↵
... ↵
>>> print(tickets)↵
... ↵
↵
# 今回作成されたTicketはstatusがNewでcause, resolutionが今回の障害に対応指定ればOK
[TicketInstance(instance='VGlja2V0SW5zdGFuY2U6MzdhMGU4ZjQ3MzY3MTFmMDhhZjczNmU4MGU1ZjI3ZWI=', xid=None, xname=None, ticketId=175, status='New', instanceName='test-instance', cause='Instance not found', resolution='Create the GCE instance')]

それではワークフローに戻ります。
現在Operatorによる承認待ちなので、承認をします。
以下のように承認待ちTaskをクリックし、Approvalボタンをクリックします。

以下のような画面になり、承認時に渡せるmetadataを設定できます。

承認待ちTaskでは、承認者の名前を受け取るため以下のmetadataを設定し、Approveボタンをクリックしましょう。

{
    "name":"operator"
}

承認は実施しましたが、ワークフロー自体は再開していません。
なので、右上の再生マークをクリックし、Runボタンをクリックしてください。
ワークフローが再開されたことで、復旧作業が開始されます。
今回の復旧作業はGCEにインスタンスを作成するため、以下のワークフロー図のTask04が動作し、他の復旧作業はSkipされました。

Caution

Taskの中で復旧作業がSkipされただけで、Task自体はSkipされていないことに注意

復旧作業終了後、DAGのワークフローは以下のような状態になります。

Operatorへの復旧確認通知が送信され、Operatorによる復旧作業承認待ちとなっています。
Operator向けのSlackチャンネルには以下のメッセージが送信されているか確認してください。
復旧確認通知ではMsgTypeがrecoveryになっているはずです。

次に復旧作業が正しく行われたか、確認します。
まずは手元のPCからgcloudコマンドを利用して、GCEからインスタンス情報を取得します。

# 以下のコマンドでVM情報の取得(instance_nameとzoneは自身で記載)
gcloud compute instances describe <instance_name> --zone=<zone>

上記のコマンドでVM情報が出力された場合、GCEにVMを作成することができている。
取得できたVM情報からインスタンスIDをコピーしてください。
gcloudが使用できない場合、Google Cloudのコンソール上から確認してください。)

GCE上にインスタンスが作成されたことが確認できた後、SDK内の既存のVMインスタンスのインスタンスID(gceId)が更新されているかを確認します。 Interactive Shellにて以下のコードを入力してください。

# Google CloudのコンソールからコピーしたインスタンスIDで検索
>>> test_instances = await atom.VmInstance.retrieve(gceId="*****")↵
... ↵
↵
>>> print(test_instances)↵
... ↵
↵
# 元々のATOMインスタンスのinstanceIdと同じであればOK(復旧作業中にgceIdが更新されている)
[VmInstance(instance='Vm1JbnN0YW5jZTo3ZGYxMjM3YTczNjUxMWYwOGFmNzM2ZTgwZTVmMjdlYg==', xid=None, xname=None, instanceId=293, instanceName='test-instance', projectId='*********', zone='*********', machineType='e2-medium', sourceImage=None, network=None, status=None, gceId='*******')]
>>>

# 初期のgceIdは「111」

復旧作業の確認ができたらOperatorによる承認を行い、ワークフローを再開します。(承認時、承認者のnamemetadataに設定してください)
再開後、DAGのインスタンスは以下のようになります。

全てのTaskがDoneに遷移したことが確認できます。
また、UserへのSlack通知も確認してください。
最後にTicketが更新されたかを確認します。
metadataからticket_idをコピーし、Interacitve Shellから以下のコードを入力します。

# ticketIdから対象のTicketを検索します。
>>> ticket = await atom.TicketInstance.retrieve(ticketId="175")↵
... ↵
↵
>>> print(ticket)↵
... ↵
↵
# statusがResolveになっていればOK
[TicketInstance(instance='VGlja2V0SW5zdGFuY2U6MzdhMGU4ZjQ3MzY3MTFmMDhhZjczNmU4MGU1ZjI3ZWI=', xid=None, xname=None, ticketId=175, status='Resolve', instanceName='test-instance', cause='Instance not found', resolution='Create the GCE instance')]
>>>

以上で本ケースの結合試験が終了です。

GCEのシステム自体に障害が発生する場合

試験準備

本試験では「GCEにVMインスタンスがない場合」の試験で作成したATOMインスタンスを使用するため、削除した場合は再作成してください。
また、インスタンスのgceIdをメモしてください。

本結合試験ではGCEシステム自体の障害を再現しなければなりません。
しかし、再現は難しいので障害検知ScenarioでIllusionを使用します。
新しく以下のIllusionを作成してください。

name category fakes
TestGCEFailure SampleDag getInstanceByScenario - GCEFailure

結合試験

それでは障害検知のScenarioをTry API Callから呼び出しましょう。
url resourceにはインスタンスのgce_idを設定してください。
またIllusionを使用するためにHTTP HederのX-Xaas-IllusionTestGCEFailureを設定してください。

Execute Debugをクリックし、APIを呼び出します。
障害を検知し、DAGのインスタンスが作成・実行されUserとOperator宛てにSlack通知が飛んでいるはずなので確認してください。
また、通知内容のcause(障害原因)、resolution(対応策)なども確認してください。
Operatorには作成されたDagのInstanceIdが通知されているので、コピーし、Control Panelでインスタンスを確認してください。

DAGのワークフローはOperatorの承認待ちになっていることが確認できます。
「GCEにVMインスタンスがない場合」の結合試験同様に新しくTicketが作成されていることを確認し、確認できたら承認とワークフローの再開を行います。(承認時に承認者のnameをmetadataで設定します)

承認後、復旧作業が開始されます。
本障害では、復旧作業(障害対処)として、GCEのシステムに障害がある旨をSDK内のATOMインスタンスのstatusに設定します。
Interactive Shellで以下のコードを入力し、障害復旧(障害対処)が完了しているか確認します。

# 対象インスタンスのgceIdで検索
>>> test_instances = await atom.VmInstance.retrieve(gceId="*****")↵
... ↵
↵
>>> print(test_instances)↵
... ↵
↵
# statusにGCE SYSTEM ERRORが入力されていればOK
[VmInstance(instance='Vm1JbnN0YW5jZTo3ZGYxMjM3YTczNjUxMWYwOGFmNzM2ZTgwZTVmMjdlYg==', xid=None, xname=None, instanceId=293, instanceName='test-instance', projectId='*******', zone='*********', machineType='e2-medium', sourceImage=None, network=None, status='GCE SYSTEM ERROR', gceId='4841664086913176400')]
>>>

復旧作業が確認できたら、OperatorによるTaskの承認とワークフローの再開を行います。(承認時に承認者のnameをmetadataで設定します)
「GCEにVMインスタンスがない場合」の結合試験同様にUserへの復旧完了通知とTicketの更新が正しく行われたか確認してください。

以上で本ケースの結合試験が終了です。

GCEとSDKでインスタンスのMachineTypeが異なる場合

試験準備

本試験では「GCEにVMインスタンスがない場合」の試験で作成したGCEのインスタンスとATOMインスタンスを使用するため、削除した場合は再作成してください。(GCEのインスタンスIDとATOMインスタンスのgceIdは同じ値である必要があります) また、インスタンスのgceIdをメモしてください。

本結合試験ではSDK内のMachineType情報を手動で変更することで、障害を再現します。 InteractiveShellに以下のコードを入力し、MachineTypeを変更してください。

# gceIdで対象インスタンスを検索
>>> test_instances = await atom.VmInstance.retrieve(gceId="******")↵
... ↵
↵
>>> test_instance = test_instances[0]↵
... ↵
↵
>>> print(test_instance)↵
... ↵
↵
# machineTypeがe2-mediumであることを確認
VmInstance(instance='Vm1JbnN0YW5jZTo3ZGYxMjM3YTczNjUxMWYwOGFmNzM2ZTgwZTVmMjdlYg==', xid=None, xname=None, instanceId=293, instanceName='test-instance', projectId='******', zone='*********', machineType='e2-medium', sourceImage=None, network=None, status='GCE SYSTEM ERROR', gceId='********')

# machineTypeをe2-smallに更新
>>> test_instance.machineType = "e2-small"↵
... ↵
↵
>>> print(test_instance)↵
... ↵
↵
# machineTypeがe2-smallに更新されていることを確認
VmInstance(instance='Vm1JbnN0YW5jZTo3ZGYxMjM3YTczNjUxMWYwOGFmNzM2ZTgwZTVmMjdlYg==', xid=None, xname=None, instanceId=293, instanceName='test-instance', projectId='******', zone='*********', machineType='e2-small', sourceImage=None, network=None, status='GCE SYSTEM ERROR', gceId='*******')

# インスタンスを保存
>>> await test_instance.save()↵
... ↵
↵

# 以下保存されていることを確認
>>> test_instances = await atom.VmInstance.retrieve(gceId="*******")↵
... ↵
↵
>>> test_instance = test_instances[0]↵
... ↵
↵
>>> print(test_instance)↵
... ↵
↵
# machineTypeがe2-smallであればOK
VmInstance(instance='Vm1JbnN0YW5jZTo3ZGYxMjM3YTczNjUxMWYwOGFmNzM2ZTgwZTVmMjdlYg==', xid=None, xname=None, instanceId=293, instanceName='test-instance', projectId='***********', zone='***********', machineType='e2-small', sourceImage=None, network=None, status='GCE SYSTEM ERROR', gceId='4841664086913176400')
>>>

gcloudコマンドにより、GCEのVM情報も確認しましょう。(gcloudが利用できない人はGCEのコンソール画面から確認してください)

gcloud compute instances describe <instance_name> --zone=<zone>


<省略>
machineType: https://www.googleapis.com/compute/v1/projects/<project_id>/zones/<zone>/machineTypes/e2-medium
<省略>

上記の出力結果よりmachineTypee2-mediumであることがわかり、先ほどのSDK内のATOMインスタンスの変更により、SDKとGCEのmachineTypeが異なることが確認できます。

結合試験

それでは障害検知のScenarioをTry API Callから呼び出しましょう。
url resourceにはインスタンスのgce_idを設定してください。
Execute Debugをクリックし、APIを呼び出します。

Scenario実行後、障害を検知し、DAGのインスタンスが作成・実行されUserとOperator宛てにSlack通知が飛んでいるはずなので確認してください。
また、通知内容のcause(障害原因)、resolution(解決策)なども確認してください。
Operatorには作成されたDAGのInstanceIdが通知されているので、コピーし、Control Panelでインスタンスを確認してください。

DAGのワークフローはOperatorの承認待ちになっていることが確認できます。
「GCEにVMインスタンスがない場合」の結合試験同様に新しくTicketが作成されていることを確認し、確認できたら承認とワークフローの再開を行います。(承認時に承認者のnameをmetadataで設定します)

承認後、復旧作業が開始されます。
本障害では、復旧作業として、SDKのMachineTypeを正しいとして、GCEのMachineTypeを更新します。

復旧作業の確認として、gcloudコマンドを利用してVM情報を取得します。(コンソール上の確認でも可)

gcloud compute instances describe <instance_name> --zone=<zone>


<省略>
machineType: https://www.googleapis.com/compute/v1/projects/<project_id>/zones/<zone>/machineTypes/e2-small
<省略>

上記の出力結果から、復旧後にmachineTypeがSDK内のインスタンス情報と同じe2-smallに変更されていることが確認できます。
GoogleCloudのコンソールからもMachineTypeがe2-smallに更新されていることを確認できます。
復旧作業が確認できたら、OperatorによるTaskの承認とワークフローの再開を行います。(承認時に承認者のnameをmetadataで設定します)
「GCEにVMインスタンスがない場合」の結合試験同様にUserへの復旧完了通知とTicketの更新が正しく行われたか確認してください。

以上で本ケースの結合試験が終了です。

ーー 以上、DAG開発のチュートリアルになります。