Simple Transaction
本チュートリアルでは、トランザクションを使用したアプリケーション開発について学習します。次のシーケンスで、Qmonus SDKのプラグインアプリケーションは、2つの異なるエンドポイントのAPIサービスを呼び出します。最初の呼び出しで、システムAはリソースデータを作成します。システムBも2回目の呼び出しでリソースデータを作成しますが、これが失敗した場合は、システムAのデータを削除する必要があるようなケースを想定します。
アプリケーションが外界に与えた変化をデータベースに逐次蓄積し、ワークフローの進行を管理しながら、失敗に備える実装はかなり面倒です。Qmonus SDKのトランザクションアシストを利用することで以下のようなステートフルなワークフロー実装を簡潔に記述できます。
事前準備
トランザクションアシストされたアプリケーションを作成する前準備として、システムAとBのモックを作成しておきます。モックは、データのCRUDができれば何でも良いので今回は、ATOMを使用して簡単なデータモデルとAPIを自動生成しておくと良いでしょう。 以下の定義で2つのATOMを作成してください。
- category: Tutorial
name: SystemA
persistence: true
api_generation: true
abstract: false
attributes:
local_fields:
- field_name: name
field_type: string
field_persistence: true
field_nullable: true
field_immutable: false
field_unique: false
- field_name: description
field_type: string
field_persistence: true
field_nullable: true
field_immutable: false
field_unique: false
ref_fields: []
methods:
class_methods: []
instance_methods: []
- category: Tutorial
name: SystemB
persistence: true
api_generation: true
abstract: false
attributes:
local_fields:
- field_name: name
field_type: string
field_persistence: true
field_nullable: true
field_immutable: false
field_unique: false
- field_name: description
field_type: string
field_persistence: true
field_nullable: true
field_immutable: false
field_unique: false
ref_fields: []
methods:
class_methods: []
instance_methods: []
Note
ATOMを作成したら、生成されたAPIを確認しておきましょう。上記のATOM定義では、api_basepath
は指定していないので生成されるAPIは/apis/systemAs
のようなリクエストパスで定義されているはずです。
Scenario
で実装する
事前準備が完了したら、シナリオを作成しましょう。次のyaml定義でシナリオを作成してください。このシナリオでは、データをシステムAとBに順番に送信します。シナリオの最後にブレークポイントを設定して、完全なロールバックが達成できることを確認します。ブレークポイントは強制的にワークフローの進行を中断し、即時キャンセルが呼び出されるため、ロールバックの動作を確認できます。
category: Tutorial
name: SimpleTransaction
uri: /simpleTransactions
method: POST
commands:
- command: script
kwargs:
code: |-
r = await callout(path="/apis/systemAs",
method="POST",
body=dict(SystemA=dict(name=context.request.body.system_a.name,
description=context.request.body.system_a.description)))
if r.error:
raise Error(r.code, reason=r.error.__str__())
a = json.loads(r.body)
cancellation:
cancellable: true
actions:
- action_type: script
code: |-
if not a:
r = await callout(path="/apis/systemAs?name={}".format(context.request.body.system_a.name))
if r.error:
await context.qmonus.abort()
return
a = json.loads(r.body)
r = await callout(path="/apis/systemAs/{}".format(a["SystemA"]["instance"]), method="DELETE")
if r.error:
await context.qmonus.abort()
label: SystemA
- command: script
kwargs:
code: |-
r = await callout(path="/apis/systemBs",
method="POST",
body=dict(SystemB=dict(name=context.request.body.system_b.name,
description=context.request.body.system_b.description)))
if r.error:
raise Error(r.code, reason=r.error.__str__())
b = json.loads(r.body)
cancellation:
cancellable: true
actions:
- action_type: script
code: |-
if not b:
r = await callout(path="/apis/systemBs?name={}".format(context.request.body.system_b.name))
if r.error:
await context.qmonus.abort()
return
b = json.loads(r.body)
r = await callout(path="/apis/systemBs/{}".format(b["SystemB"]["instance"]), method="DELETE")
if r.error:
await context.qmonus.abort()
label: SystemB
- command: breakpoint
kwargs:
abort: true
immediate_cancel: true
request_timeout: 60
connect_timeout: 60
spec:
response:
normal:
codes:
- 200
request:
body:
type: object
required:
- system_a
- system_b
properties:
system_a:
type: object
properties:
name:
type: string
description:
type: string
required:
- name
system_b:
type: object
properties:
name:
type: string
description:
type: string
required:
- name
transaction:
enable: true
async: true
xname_use_counter: false
auto_begin: true
auto_response: true
xname: ''
lock:
lock_keys:
- context.request.body.systemA.name
- context.request.body.systemB.name
retry_count: 3
retry_interval: 1
callback_options:
url: ''
xdomain: Tutorial
xtype: Two-Phase Commit
auto_rollback: true
routing_auto_generation_mode: true
global_variables:
a:
initial: null
description: SystemA Instance variable
b:
initial: null
description: SystemB Instance variable
シナリオの作成が完了したら、デバッグ画面から実行してみてください。UIで実行するとワークフローの進捗がグラフィカルに確認できます。 POSTするデータは以下のように適当なもので構いません。
POST /simpleTransactions
{
"system_a": {
"name": "aaa"
},
"system_b": {
"name": "bbb"
}
}
ロールバックが確認できたら、breakpoint
コマンドのimmediate_cancel=False
に設定を変更して、再度実行してみましょう。
ワークフローが中断された状態で、ロックが残っていることを確認できます。ロックは、Transaction Monitor
のMutexes
タブで確認できます。
また、トランザクション管理画面からcancel
を実行すると、ロールバックできることを確認できます。
ATOMで実装する
オブジェクト指向の知識と経験が少し必要ですが、前述したシナリオと同等の処理をATOMを使用してオブジェクト指向で開発することもできます。
シナリオでは処理の流れを意識してワークフローを作成しました。これをオブジェクト指向で表現する場合、ワークフローをオブジェクトの状態遷移に置き換えて考えると、イメージしやすくなります。
モデリングとして、Orchestration
という概念をオブジェクト化してみます。Orchestration
は、システムAとBをセットにした概念として捉え、本オブジェクトが持つステートマシンでワークフローを表現します。システムAおよびBのCRUD-APIは、Shadow
というATOMでラップします。システムAもBもこのチュートリアルではエンドポイントが異なるだけですので振る舞いは、スーパークラスに実装して抽象化します。Orchestration
にはAとB2つのATOMを包含させます。
以下のyaml定義に従ってそれぞれのATOMを作成してください。
Shadow
category: Tutorial
name: Shadow
persistence: false
api_generation: false
abstract: true
attributes:
local_fields:
- field_name: system
field_type: string
field_persistence: true
field_nullable: true
field_immutable: false
field_unique: false
field_enum:
- A
- B
field_metadata:
ignore: true
- field_name: description
field_type: string
field_persistence: true
field_nullable: true
field_immutable: false
field_unique: false
field_metadata:
ignore: false
- field_name: origin
field_type: string
field_persistence: true
field_nullable: true
field_immutable: false
field_unique: false
field_metadata:
ignore: true
ref_fields: []
identifier:
field_name: name
field_type: string
field_persistence: true
field_immutable: true
field_metadata:
ignore: false
methods:
class_methods: []
instance_methods:
- method_body: |-
async def create(self, *args, **kwargs):
print("作成中...%s" % self.system)
r = await callout(path="/apis/system{}s".format(self.system),
method=POST,
body={"System{}".format(self.system): self.localfields(ignore=False)})
if r.error:
print("作成失敗...%s" % self.system)
raise Error(r.code, reason=r.error.__str__())
self.origin = json.loads(r.body)["System{}".format(self.system)]["instance"]
print("作成完了...%s" % self.system)
propagation_mode: true
topdown: true
auto_rollback: true
multiplexable_number: 1
field_order: ascend
- method_body: |-
async def delete(self, *args, **kwargs):
print("削除要否判定...%s" % self.system)
if not self.origin:
r = await callout(path="/apis/system{}s?name={}".format(self.system, self.name))
if r.error:
print("削除要否判定失敗...%s" % self.system)
raise Error(r.code, reason=r.error.__str__())
payload = json.loads(r.body)
if len(payload)==0:
print("削除不要...%s" % self.system)
return
self.origin = payload[0]["System{}".format(self.system)]["instance"]
print("削除中...%s" % self.system)
r = await callout(path="/apis/system{}s/{}".format(self.system, self.origin), method=DELETE)
if r.error:
print("削除失敗...%s" % self.system)
raise Error(r.code, reason=r.error.__str__())
print("削除完了...%s" % self.system)
propagation_mode: true
topdown: true
auto_rollback: true
multiplexable_number: 1
field_order: ascend
ShadowA and B
- category: Tutorial
name: ShadowA
persistence: true
api_generation: false
abstract: false
extends:
- Shadow
attributes:
local_fields: []
ref_fields: []
methods:
class_methods: []
instance_methods:
- method_body: |-
def initialize(self, *args, **kwargs):
self.system = "A"
propagation_mode: true
topdown: true
auto_rollback: true
multiplexable_number: 1
field_order: ascend
- category: Tutorial
name: ShadowB
persistence: true
api_generation: false
abstract: false
extends:
- Shadow
attributes:
local_fields: []
ref_fields: []
methods:
class_methods: []
instance_methods:
- method_body: |-
def initialize(self, *args, **kwargs):
self.system = "B"
propagation_mode: true
topdown: true
auto_rollback: true
multiplexable_number: 1
field_order: ascend
Orchestration
category: Tutorial
name: Orchestration
persistence: true
api_generation: true
abstract: false
attributes:
local_fields:
- field_name: system_a
field_type: <AxisAtom.ShadowA>
field_persistence: true
field_nullable: false
field_immutable: false
field_unique: false
field_default: atom.ShadowA(name=uuid.uuid1().hex)
- field_name: system_b
field_type: <AxisAtom.ShadowB>
field_persistence: true
field_nullable: false
field_immutable: false
field_unique: false
field_default: atom.ShadowB(name=uuid.uuid1().hex)
- field_name: status
field_type: string
field_persistence: true
field_nullable: true
field_immutable: false
field_unique: false
field_fsm:
created:
execution_method: create
failure_transition: deleted
deleted:
execution_method: delete
field_metadata:
POST: false
PUT: false
ref_fields: []
identifier:
field_name: id
field_type: string
field_persistence: true
field_immutable: true
field_metadata:
POST: false
PUT: false
methods:
class_methods: []
instance_methods:
- method_body: |-
def initialize(self, *args, **kwargs):
if not self.id:
self.id = uuid.uuid1().hex
propagation_mode: false
topdown: true
auto_rollback: true
multiplexable_number: 1
field_order: ascend
- method_body: |-
async def create(self, *args, **kwargs):
pass
propagation_mode: true
topdown: true
auto_rollback: true
multiplexable_number: 1
field_order: ascend
- method_body: |-
async def delete(self, *args, **kwargs):
pass
propagation_mode: true
topdown: false
auto_rollback: true
multiplexable_number: 1
field_order: descend
ATOMの作成が完了したら、REPLでOrchestration
インスタンスを作成し、create
メソッドとdelete
メソッドをテストしましょう。メソッドの各ポイントには動作状況が把握しやすいようデバッグプリント文を挿入してあるため以下のような出力が確認できます。
ATOMのメソッド伝搬によってOrchestration
のcreate
メソッド呼び出しが包含されているオブジェクトに対して伝搬されます。create
時の伝搬は包含フィールドの昇順に設定していますのでAからBの順に伝搬します。delete
時の伝搬は降順に設定していますのでBからAの順に伝搬します。
>>> o = atom.Orchestration()↵
... await o.create()↵
... ↵
作成中...A
作成完了...A
作成中...B
作成完了...B
↵
>>> await o.delete()↵
... ↵
削除要否判定...B
削除中...B
削除完了...B
削除要否判定...A
削除中...A
削除完了...A
↵
>>>
ロールバックをテストする場合は、API Gatewayで/apis/systemBs
のAPIルーティングを閉塞してからcreate
メソッドを実行すると良いでしょう。
Routing Table
画面で該当のルーティングに対してChange State to OUS
を選択することで閉塞できます。
閉塞状態でcreate
を実行すると以下のように出力されます。
>>> o = atom.Orchestration()↵
... await o.create()↵
... ↵
作成中...A
作成完了...A
作成中...B
作成失敗...B
HTTP 503: HTTP 503: OutOfService
>>> transactions↵
bb7971f63dbf11eba640acde48001122 T3JjaGVzdHJhdGlvbjpiYjc5MzBmNjNkYmYxMWViYTY0MGFjZGU0ODAwMTEyMg== Aborted
>>> select * from SystemA;↵
| instance | xid | xname | name | description |
U3lzdGVtQTpiYjdlOTVlNjNkYmYxMWViYTY0MGFjZGU0ODAwMTEyMg== NULL NULL bb7914183dbf11eba640acde48001122 NULL
>>> select * from SystemB;↵
Empty set
>>>
Routing Table
画面で該当のルーティングに対してChange State to INS
を選択することで閉塞を解除します。
API閉塞を解除したら、Transaction Table
画面でAborted
となっている該当トランザクションをcancel
してください。
以下のようにロールバックされることが確認できます。
>>> transactions↵
bb7971f63dbf11eba640acde48001122 T3JjaGVzdHJhdGlvbjpiYjc5MzBmNjNkYmYxMWViYTY0MGFjZGU0ODAwMTEyMg== Cancelled
>>> select * from SystemA;↵
Empty set
>>> select * from SystemB;↵
Empty set
>>>