Simple Transaction

本チュートリアルでは、トランザクションを使用したアプリケーション開発について学習します。次のシーケンスで、Qmonus SDKのプラグインアプリケーションは、2つの異なるエンドポイントのAPIサービスを呼び出します。最初の呼び出しで、システムAはリソースデータを作成します。システムBも2回目の呼び出しでリソースデータを作成しますが、これが失敗した場合は、システムAのデータを削除する必要があるようなケースを想定します。

アプリケーションが外界に与えた変化をデータベースに逐次蓄積し、ワークフローの進行を管理しながら、失敗に備える実装はかなり面倒です。Qmonus SDKのトランザクションアシストを利用することで以下のようなステートフルなワークフロー実装を簡潔に記述できます。

事前準備

トランザクションアシストされたアプリケーションを作成する前準備として、システムAとBのモックを作成しておきます。モックは、データのCRUDができれば何でも良いので今回は、ATOMを使用して簡単なデータモデルとAPIを自動生成しておくと良いでしょう。 以下の定義で2つのATOMを作成してください。

- category: Tutorial
  name: SystemA
  persistence: true
  api_generation: true
  abstract: false
  attributes:
    local_fields:
      - field_name: name
        field_type: string
        field_persistence: true
        field_nullable: true
        field_immutable: false
        field_unique: false
      - field_name: description
        field_type: string
        field_persistence: true
        field_nullable: true
        field_immutable: false
        field_unique: false
    ref_fields: []
  methods:
    class_methods: []
    instance_methods: []
- category: Tutorial
  name: SystemB
  persistence: true
  api_generation: true
  abstract: false
  attributes:
    local_fields:
      - field_name: name
        field_type: string
        field_persistence: true
        field_nullable: true
        field_immutable: false
        field_unique: false
      - field_name: description
        field_type: string
        field_persistence: true
        field_nullable: true
        field_immutable: false
        field_unique: false
    ref_fields: []
  methods:
    class_methods: []
    instance_methods: []

Note

ATOMを作成したら、生成されたAPIを確認しておきましょう。上記のATOM定義では、api_basepathは指定していないので生成されるAPIは/apis/systemAsのようなリクエストパスで定義されているはずです。


Scenarioで実装する

事前準備が完了したら、シナリオを作成しましょう。次のyaml定義でシナリオを作成してください。このシナリオでは、データをシステムAとBに順番に送信します。シナリオの最後にブレークポイントを設定して、完全なロールバックが達成できることを確認します。ブレークポイントは強制的にワークフローの進行を中断し、即時キャンセルが呼び出されるため、ロールバックの動作を確認できます。

category: Tutorial
name: SimpleTransaction
uri: /simpleTransactions
method: POST
commands:
  - command: script
    kwargs:
      code: |-
        r = await callout(path="/apis/systemAs",
                          method="POST",
                          body=dict(SystemA=dict(name=context.request.body.system_a.name,
                                                 description=context.request.body.system_a.description)))
        if r.error:
            raise Error(r.code, reason=r.error.__str__())
        a = json.loads(r.body)
      cancellation:
        cancellable: true
        actions:
          - action_type: script
            code: |-
              if not a:
                  r = await callout(path="/apis/systemAs?name={}".format(context.request.body.system_a.name))
                  if r.error:
                      await context.qmonus.abort()
                      return
                  a = json.loads(r.body)

              r = await callout(path="/apis/systemAs/{}".format(a["SystemA"]["instance"]), method="DELETE")
              if r.error:
                  await context.qmonus.abort()
    label: SystemA
  - command: script
    kwargs:
      code: |-
        r = await callout(path="/apis/systemBs",
                          method="POST",
                          body=dict(SystemB=dict(name=context.request.body.system_b.name,
                                                 description=context.request.body.system_b.description)))
        if r.error:
            raise Error(r.code, reason=r.error.__str__())
        b = json.loads(r.body)
      cancellation:
        cancellable: true
        actions:
          - action_type: script
            code: |-
              if not b:
                  r = await callout(path="/apis/systemBs?name={}".format(context.request.body.system_b.name))
                  if r.error:
                      await context.qmonus.abort()
                      return
                  b = json.loads(r.body)

              r = await callout(path="/apis/systemBs/{}".format(b["SystemB"]["instance"]), method="DELETE")
              if r.error:
                  await context.qmonus.abort()
    label: SystemB
  - command: breakpoint
    kwargs:
      abort: true
      immediate_cancel: true
request_timeout: 60
connect_timeout: 60
spec:
  response:
    normal:
      codes:
        - 200
  request:
    body:
      type: object
      required:
        - system_a
        - system_b
      properties:
        system_a:
          type: object
          properties:
            name:
              type: string
            description:
              type: string
          required:
            - name
        system_b:
          type: object
          properties:
            name:
              type: string
            description:
              type: string
          required:
            - name
transaction:
  enable: true
  async: true
  xname_use_counter: false
  auto_begin: true
  auto_response: true
  xname: ''
  lock:
    lock_keys:
      - context.request.body.systemA.name
      - context.request.body.systemB.name
    retry_count: 3
    retry_interval: 1
  callback_options:
    url: ''
  xdomain: Tutorial
  xtype: Two-Phase Commit
  auto_rollback: true
routing_auto_generation_mode: true
global_variables:
  a:
    initial: null
    description: SystemA Instance variable
  b:
    initial: null
    description: SystemB Instance variable

シナリオの作成が完了したら、デバッグ画面から実行してみてください。UIで実行するとワークフローの進捗がグラフィカルに確認できます。 POSTするデータは以下のように適当なもので構いません。

POST /simpleTransactions
{
    "system_a": {
        "name": "aaa"
    },
    "system_b": {
        "name": "bbb"
    }
}


ロールバックが確認できたら、breakpointコマンドのimmediate_cancel=Falseに設定を変更して、再度実行してみましょう。
ワークフローが中断された状態で、ロックが残っていることを確認できます。ロックは、Transaction MonitorMutexesタブで確認できます。
また、トランザクション管理画面からcancelを実行すると、ロールバックできることを確認できます。


ATOMで実装する

オブジェクト指向の知識と経験が少し必要ですが、前述したシナリオと同等の処理をATOMを使用してオブジェクト指向で開発することもできます。
シナリオでは処理の流れを意識してワークフローを作成しました。これをオブジェクト指向で表現する場合、ワークフローをオブジェクトの状態遷移に置き換えて考えると、イメージしやすくなります。

モデリングとして、Orchestrationという概念をオブジェクト化してみます。Orchestrationは、システムAとBをセットにした概念として捉え、本オブジェクトが持つステートマシンでワークフローを表現します。システムAおよびBのCRUD-APIは、ShadowというATOMでラップします。システムAもBもこのチュートリアルではエンドポイントが異なるだけですので振る舞いは、スーパークラスに実装して抽象化します。OrchestrationにはAとB2つのATOMを包含させます。
以下のyaml定義に従ってそれぞれのATOMを作成してください。

Shadow

category: Tutorial
name: Shadow
persistence: false
api_generation: false
abstract: true
attributes:
  local_fields:
    - field_name: system
      field_type: string
      field_persistence: true
      field_nullable: true
      field_immutable: false
      field_unique: false
      field_enum:
        - A
        - B
      field_metadata:
        ignore: true
    - field_name: description
      field_type: string
      field_persistence: true
      field_nullable: true
      field_immutable: false
      field_unique: false
      field_metadata:
        ignore: false
    - field_name: origin
      field_type: string
      field_persistence: true
      field_nullable: true
      field_immutable: false
      field_unique: false
      field_metadata:
        ignore: true
  ref_fields: []
  identifier:
    field_name: name
    field_type: string
    field_persistence: true
    field_immutable: true
    field_metadata:
      ignore: false
methods:
  class_methods: []
  instance_methods:
    - method_body: |-
        async def create(self, *args, **kwargs):
            print("作成中...%s" % self.system)
            r = await callout(path="/apis/system{}s".format(self.system),
                              method=POST,
                              body={"System{}".format(self.system): self.localfields(ignore=False)})
            if r.error:
                print("作成失敗...%s" % self.system)
                raise Error(r.code, reason=r.error.__str__())
            self.origin = json.loads(r.body)["System{}".format(self.system)]["instance"]
            print("作成完了...%s" % self.system)
      propagation_mode: true
      topdown: true
      auto_rollback: true
      multiplexable_number: 1
      field_order: ascend
    - method_body: |-
        async def delete(self, *args, **kwargs):
            print("削除要否判定...%s" % self.system)
            if not self.origin:
                r = await callout(path="/apis/system{}s?name={}".format(self.system, self.name))
                if r.error:
                    print("削除要否判定失敗...%s" % self.system)
                    raise Error(r.code, reason=r.error.__str__())
                payload = json.loads(r.body)
                if len(payload)==0:
                    print("削除不要...%s" % self.system)
                    return
                self.origin = payload[0]["System{}".format(self.system)]["instance"]

            print("削除中...%s" % self.system)
            r = await callout(path="/apis/system{}s/{}".format(self.system, self.origin), method=DELETE)
            if r.error:
                print("削除失敗...%s" % self.system)
                raise Error(r.code, reason=r.error.__str__())
            print("削除完了...%s" % self.system)
      propagation_mode: true
      topdown: true
      auto_rollback: true
      multiplexable_number: 1
      field_order: ascend

ShadowA and B

- category: Tutorial
  name: ShadowA
  persistence: true
  api_generation: false
  abstract: false
  extends:
    - Shadow
  attributes:
    local_fields: []
    ref_fields: []
  methods:
    class_methods: []
    instance_methods:
      - method_body: |-
          def initialize(self, *args, **kwargs):
              self.system = "A"
        propagation_mode: true
        topdown: true
        auto_rollback: true
        multiplexable_number: 1
        field_order: ascend
- category: Tutorial
  name: ShadowB
  persistence: true
  api_generation: false
  abstract: false
  extends:
    - Shadow
  attributes:
    local_fields: []
    ref_fields: []
  methods:
    class_methods: []
    instance_methods:
      - method_body: |-
          def initialize(self, *args, **kwargs):
              self.system = "B"
        propagation_mode: true
        topdown: true
        auto_rollback: true
        multiplexable_number: 1
        field_order: ascend

Orchestration

category: Tutorial
name: Orchestration
persistence: true
api_generation: true
abstract: false
attributes:
  local_fields:
    - field_name: system_a
      field_type: <AxisAtom.ShadowA>
      field_persistence: true
      field_nullable: false
      field_immutable: false
      field_unique: false
      field_default: atom.ShadowA(name=uuid.uuid1().hex)
    - field_name: system_b
      field_type: <AxisAtom.ShadowB>
      field_persistence: true
      field_nullable: false
      field_immutable: false
      field_unique: false
      field_default: atom.ShadowB(name=uuid.uuid1().hex)
    - field_name: status
      field_type: string
      field_persistence: true
      field_nullable: true
      field_immutable: false
      field_unique: false
      field_fsm:
        created:
          execution_method: create
          failure_transition: deleted
        deleted:
          execution_method: delete
      field_metadata:
        POST: false
        PUT: false
  ref_fields: []
  identifier:
    field_name: id
    field_type: string
    field_persistence: true
    field_immutable: true
    field_metadata:
      POST: false
      PUT: false
methods:
  class_methods: []
  instance_methods:
    - method_body: |-
        def initialize(self, *args, **kwargs):
            if not self.id:
                self.id = uuid.uuid1().hex
      propagation_mode: false
      topdown: true
      auto_rollback: true
      multiplexable_number: 1
      field_order: ascend
    - method_body: |-
        async def create(self, *args, **kwargs):
            pass
      propagation_mode: true
      topdown: true
      auto_rollback: true
      multiplexable_number: 1
      field_order: ascend
    - method_body: |-
        async def delete(self, *args, **kwargs):
            pass
      propagation_mode: true
      topdown: false
      auto_rollback: true
      multiplexable_number: 1
      field_order: descend


ATOMの作成が完了したら、REPLでOrchestrationインスタンスを作成し、createメソッドとdeleteメソッドをテストしましょう。メソッドの各ポイントには動作状況が把握しやすいようデバッグプリント文を挿入してあるため以下のような出力が確認できます。


ATOMのメソッド伝搬によってOrchestrationcreateメソッド呼び出しが包含されているオブジェクトに対して伝搬されます。create時の伝搬は包含フィールドの昇順に設定していますのでAからBの順に伝搬します。delete時の伝搬は降順に設定していますのでBからAの順に伝搬します。

>>> o = atom.Orchestration()↵
... await o.create()↵
... ↵
作成中...A
作成完了...A
作成中...B
作成完了...B
↵
>>> await o.delete()↵
... ↵
削除要否判定...B
削除中...B
削除完了...B
削除要否判定...A
削除中...A
削除完了...A
↵
>>>


ロールバックをテストする場合は、API Gatewayで/apis/systemBsのAPIルーティングを閉塞してからcreateメソッドを実行すると良いでしょう。
Routing Table画面で該当のルーティングに対してChange State to OUSを選択することで閉塞できます。 閉塞状態でcreateを実行すると以下のように出力されます。

>>> o = atom.Orchestration()↵
... await o.create()↵
... ↵
作成中...A
作成完了...A
作成中...B
作成失敗...B
HTTP 503: HTTP 503: OutOfService
>>> transactions↵
bb7971f63dbf11eba640acde48001122 T3JjaGVzdHJhdGlvbjpiYjc5MzBmNjNkYmYxMWViYTY0MGFjZGU0ODAwMTEyMg== Aborted
>>> select * from SystemA;↵
| instance | xid | xname | name | description |
U3lzdGVtQTpiYjdlOTVlNjNkYmYxMWViYTY0MGFjZGU0ODAwMTEyMg== NULL NULL bb7914183dbf11eba640acde48001122 NULL
>>> select * from SystemB;↵
Empty set
>>>


Routing Table画面で該当のルーティングに対してChange State to INSを選択することで閉塞を解除します。
API閉塞を解除したら、Transaction Table画面でAbortedとなっている該当トランザクションをcancelしてください。
以下のようにロールバックされることが確認できます。

>>> transactions↵
bb7971f63dbf11eba640acde48001122 T3JjaGVzdHJhdGlvbjpiYjc5MzBmNjNkYmYxMWViYTY0MGFjZGU0ODAwMTEyMg== Cancelled
>>> select * from SystemA;↵
Empty set
>>> select * from SystemB;↵
Empty set
>>>