シナリオでのFanout/Faninの方法を教えてください
シナリオで複数のマイクロサービスを並列に非同期に呼び出し、各マイクロサービスからのAPIコールバックを待ち合わせて処理を再開するには、serve
コマンドを利用します。Docs » チュートリアル » Compound Transaction
では、サブシナリオからのAPIコールバックを例に記載していますが、複数のAPIコールバックを待ち合わせる点において留意点が異なります。
以下のシーケンスを例としてサンプルアプリケーションを紹介します。
データモデルの設計
最初にリクエストオーダを管理するデータモデルと各マイクロサービスへのリクエスト状態を管理するデータモデルを設計します。
以下のクラス図のようにリクエストオーダをParallelCallbackOrderモデルで定義します。識別子としてorderIdを保持します。以降、このorderIdをトランザクション名として利用することとします。
各マイクロサービスへのリクエスト状態を管理するのは、ParallelCallbackBatchJobです。識別子としてjobIdを保持します。ParallelCallbackOrderは、各マイクロサービスへの呼び出し状態を管理するためにParallelCallbackBatchJobをリストで保持します。
また各マイクロサービスの動作を模擬するために固有のendpointと処理応答時間を持たせたBatchJobA、BatchJobB、BatchJobCを具象クラスとして定義します。
各種プラグインの作成
上述のシーケンスとデータモデルに従ってプラグインをサンプル実装していきます。
ParallelCallbackBatchJob抽象クラスのATOM定義
ParallelCallbackBatchJobクラスは以下のように定義します。状態としてprocessing
もしくはcompleted
を値として管理します。run
メソッドが呼び出されると具象クラスが持つendpointに対してリクエストを送信します。
後述するメインシナリオに対して送信先からのAPIコールバックを行い、メインシナリオを再開するため、送信先にはorderIdとjobIdを渡しています。isCompleted
メソッドは、処理が完了しているかをチェックするメソッドです。
category: example
persistence: true
abstract: true
api_generation: false
name: ParallelCallbackBatchJob
attributes:
identifier:
field_name: jobId
field_type: string
field_persistence: true
field_immutable: true
local_fields:
- field_name: status
field_type: string
field_persistence: true
field_nullable: true
field_immutable: false
field_unique: false
field_default: processing
field_enum:
- processing
- completed
ref_fields:
- field_name: orderId
field_type: string
field_persistence: true
field_unique: false
ref_class: ParallelCallbackOrder
ref_class_field: orderId
methods:
class_methods: []
instance_methods:
- method_body: |-
async def run(self):
r = await callout(path=f"{self.endpoint}", method=POST, body=dict(jobId=self.jobId, orderId=self.orderId, timeout=self.timeout))
if r.error:
raise Error(r.code, reason="Failed")
propagation_mode: true
topdown: true
auto_rollback: true
multiplexable_number: 1
field_order: ascend
- method_body: |-
def isCompleted(self):
return True if self.status=="completed" else False
propagation_mode: true
topdown: true
auto_rollback: true
multiplexable_number: 1
field_order: ascend
BatchJobA具象クラスのATOM定義
BatchJobAクラスは以下のように定義します。固有のendpointとtimeoutを定義しています。
abstract: false
api_generation: false
attributes:
local_fields:
- field_default: /v1/batchJobA
field_immutable: true
field_name: endpoint
field_nullable: false
field_persistence: true
field_type: string
field_unique: false
- field_default: '30'
field_immutable: false
field_name: timeout
field_nullable: true
field_persistence: true
field_type: integer
field_unique: false
category: example
extends:
- ParallelCallbackBatchJob
methods:
class_methods: []
instance_methods: []
name: BatchJobA
persistence: true
BatchJobB具象クラスのATOM定義
BatchJobBクラスは以下のように定義します。BatchJobA同様に固有のendpointとtimeoutを定義しています。
abstract: false
api_generation: false
attributes:
local_fields:
- field_default: /v1/batchJobB
field_immutable: true
field_name: endpoint
field_nullable: false
field_persistence: true
field_type: string
field_unique: false
- field_default: '20'
field_immutable: false
field_name: timeout
field_nullable: true
field_persistence: true
field_type: integer
field_unique: false
category: example
extends:
- ParallelCallbackBatchJob
methods:
class_methods: []
instance_methods: []
name: BatchJobB
persistence: true
BatchJobC具象クラスのATOM定義
BatchJobCクラスは以下のように定義します。BatchJobA同様に固有のendpointとtimeoutを定義しています。
abstract: false
api_generation: false
attributes:
local_fields:
- field_default: /v1/batchJobC
field_immutable: true
field_name: endpoint
field_nullable: false
field_persistence: true
field_type: string
field_unique: false
- field_default: '10'
field_immutable: false
field_name: timeout
field_nullable: true
field_persistence: true
field_type: integer
field_unique: false
category: example
extends:
- ParallelCallbackBatchJob
methods:
class_methods: []
instance_methods: []
name: BatchJobC
persistence: true
ParallelCallbackOrderクラスのATOM定義
ParallelCallbackOrderクラスは以下のように定義します。最初に定義したParallelCallbackBatchJobをリストで保持します。
isCompleted
メソッドは、包含するマイクロサービスの処理状態が全て完了しているかをチェックするメソッドです。
abstract: false
api_generation: false
attributes:
identifier:
field_immutable: true
field_name: orderId
field_persistence: true
field_type: string
local_fields:
- field_immutable: false
field_name: jobs
field_nullable: true
field_persistence: true
field_type: array<AxisAtom.ParallelCallbackBatchJob>
field_unique: false
- field_immutable: false
field_name: parameters
field_nullable: true
field_persistence: true
field_type: object
field_unique: false
ref_fields: []
category: example
methods:
class_methods: []
instance_methods:
- auto_rollback: true
field_order: ascend
method_body: |-
def isCompleted(self):
for i in self.jobs:
if not i.isCompleted():
return False
return True
multiplexable_number: 1
propagation_mode: true
topdown: true
name: ParallelCallbackOrder
persistence: true
Mockサービスのシナリオ定義
次に上述したParallelCallbackBatchJobのrunメソッドから呼び出される各マイクロサービスの動作を模擬するモックAPIを作成しておきます。
動作としては、リクエストを受け付けると入力されたtimeout時間処理をスリープしてメインシナリオにコールバックするだけです。
サービス毎にシナリオを作成しても良いですが、処理が全く同じなので一つのシナリオにadditional_pathsを設定して異なるエンドポイントを疑似しています。
- additional_paths:
- /v1/batchJobB
- /v1/batchJobC
category: example
commands:
- command: request_validation
kwargs:
aspect_options:
post:
process: |-
context.session.set_status(202)
context.session.finish()
body:
properties:
jobId:
type: string
orderId:
type: string
timeout:
type: integer
required:
- jobId
- orderId
- timeout
type: object
headers:
properties:
Content-Type:
enum:
- application/json
type: string
required:
- Content-Type
type: object
label: オーダ受付
- command: script
kwargs:
code: |-
await asyncio.sleep(context.request.body.timeout)
await callout(path="/v1/parallelCallbacks/results",
method=POST,
body=context.request.body.dictionary)
label: コールバック
connect_timeout: 60
global_variables: {}
method: POST
name: batchJob
request_timeout: 60
routing_auto_generation_mode: true
routing_options:
scope: local
spec:
response:
normal:
codes:
- 200
transaction:
async: true
auto_response: false
enable: true
xname: ''
uri: /v1/batchJobA
variable_groups: []
メインシナリオ定義
最後に実際にFanout/Faninを行うメインシナリオを作成します。
メインシナリオでは、リクエストを受付すると最初にヘッダとbodyをバリデーションします。bodyは適当にparametersというキーのobjectを受け入れるようにしていますが中身はDon't careです。
バリデーションを抜けるとParallelCallbackOrderインスタンスを生成してデータベースに永続化して要求元にAccepted応答を返却します。次にParallelCallbackOrderインスタンスが保持するjobに対して並列でrunメソッドを呼び出してFanoutします。このブロックを抜けるとトランザクションは自動的にsuspend状態となります。
各マイクロサービスからのAPIコールバックを受信するとserveコマンドが実行されます。serveコマンドでは受信したコールバック情報からjobインスタンスをローディングして状態を更新します。この時、全てのマイクロサービスのjobが終了していればグローバル変数faninをTrueに設定してserveコマンドを抜けます。Falseの場合は、suspend状態を継続します。
- additional_paths: []
category: example
commands:
- command: request_validation
kwargs:
body:
properties:
parameters:
type: object
required:
- parameters
type: object
headers:
properties:
Content-Type:
enum:
- application/json
type: string
required:
- Content-Type
type: object
label: オーダ受付
- command: script
kwargs:
code: |-
order = atom.ParallelCallbackOrder(orderId=context.qmonus.xname,
jobs=[atom.BatchJobA(jobId=uuid.uuid1().hex),
atom.BatchJobB(jobId=uuid.uuid1().hex),
atom.BatchJobC(jobId=uuid.uuid1().hex)],
parameters=context.request.body.parameters.dictionary)
await order.save()
context.session.set_status(202)
context.session.finish(order.dictionary)
await asyncio.gather(*[i.run() for i in order.jobs])
label: ファンアウト
- command: serve
kwargs:
aspect_options:
pre:
process: |-
"""callbackを受信したらジョブの状態を更新します
"""
batchJob = await atom.ParallelCallbackBatchJob.load(context.request.body.jobId)
await batchJob.save(status="completed")
"""オーダ情報を取得して全てのジョブが完了していればfaninを真に設定
"""
order = await atom.ParallelCallbackOrder.load(context.request.body.orderId)
fanin = True if order.isCompleted() else False
continue_condition: fanin==False
deadline: 540000
method: POST
path: /v1/parallelCallbacks/results
xname_key: orderId
label: ファンイン
- command: script
kwargs:
code: print("All BatchJob Completed!!")
label: 終了
connect_timeout: 60
global_variables:
fanin:
description: ''
initial: false
method: POST
name: parallelCallbacks
request_timeout: 60
routing_auto_generation_mode: true
routing_options:
scope: local
spec:
response:
normal:
codes:
- 200
transaction:
async: true
auto_response: false
enable: true
uri: /v1/parallelCallbacks
variable_groups: []
Warning
上記のサンプルアプリケーションを動作させる場合は、APIコールバックで利用する/v1/parallelCallbacks/results
のルーティングをAPIGWに設定することを忘れないでください。
また、APIコールバックが同時に来る場合の排他制御を考慮する場合は追加するAPIコールバックのルーティングパイプラインのQueuingオプションでserial
を有効化してください。