シナリオでのFanout/Faninの方法を教えてください



シナリオで複数のマイクロサービスを並列に非同期に呼び出し、各マイクロサービスからのAPIコールバックを待ち合わせて処理を再開するには、serveコマンドを利用します。Docs » チュートリアル » Compound Transactionでは、サブシナリオからのAPIコールバックを例に記載していますが、複数のAPIコールバックを待ち合わせる点において留意点が異なります。

以下のシーケンスを例としてサンプルアプリケーションを紹介します。


データモデルの設計

最初にリクエストオーダを管理するデータモデルと各マイクロサービスへのリクエスト状態を管理するデータモデルを設計します。
以下のクラス図のようにリクエストオーダをParallelCallbackOrderモデルで定義します。識別子としてorderIdを保持します。以降、このorderIdをトランザクション名として利用することとします。
各マイクロサービスへのリクエスト状態を管理するのは、ParallelCallbackBatchJobです。識別子としてjobIdを保持します。ParallelCallbackOrderは、各マイクロサービスへの呼び出し状態を管理するためにParallelCallbackBatchJobをリストで保持します。
また各マイクロサービスの動作を模擬するために固有のendpointと処理応答時間を持たせたBatchJobA、BatchJobB、BatchJobCを具象クラスとして定義します。


各種プラグインの作成

上述のシーケンスとデータモデルに従ってプラグインをサンプル実装していきます。

ParallelCallbackBatchJob抽象クラスのATOM定義

ParallelCallbackBatchJobクラスは以下のように定義します。状態としてprocessingもしくはcompletedを値として管理します。runメソッドが呼び出されると具象クラスが持つendpointに対してリクエストを送信します。
後述するメインシナリオに対して送信先からのAPIコールバックを行い、メインシナリオを再開するため、送信先にはorderIdとjobIdを渡しています。isCompletedメソッドは、処理が完了しているかをチェックするメソッドです。

category: example
persistence: true
abstract: true
api_generation: false
name: ParallelCallbackBatchJob
attributes:
  identifier:
    field_name: jobId
    field_type: string
    field_persistence: true
    field_immutable: true
  local_fields:
    - field_name: status
      field_type: string
      field_persistence: true
      field_nullable: true
      field_immutable: false
      field_unique: false
      field_default: processing
      field_enum:
        - processing
        - completed
  ref_fields:
    - field_name: orderId
      field_type: string
      field_persistence: true
      field_unique: false
      ref_class: ParallelCallbackOrder
      ref_class_field: orderId
methods:
  class_methods: []
  instance_methods:
    - method_body: |-
        async def run(self):
            r = await callout(path=f"{self.endpoint}", method=POST, body=dict(jobId=self.jobId, orderId=self.orderId, timeout=self.timeout))
            if r.error:
                raise Error(r.code, reason="Failed")
      propagation_mode: true
      topdown: true
      auto_rollback: true
      multiplexable_number: 1
      field_order: ascend
    - method_body: |-
        def isCompleted(self):
            return True if self.status=="completed" else False
      propagation_mode: true
      topdown: true
      auto_rollback: true
      multiplexable_number: 1
      field_order: ascend


BatchJobA具象クラスのATOM定義

BatchJobAクラスは以下のように定義します。固有のendpointとtimeoutを定義しています。

abstract: false
api_generation: false
attributes:
  local_fields:
    - field_default: /v1/batchJobA
      field_immutable: true
      field_name: endpoint
      field_nullable: false
      field_persistence: true
      field_type: string
      field_unique: false
    - field_default: '30'
      field_immutable: false
      field_name: timeout
      field_nullable: true
      field_persistence: true
      field_type: integer
      field_unique: false
category: example
extends:
  - ParallelCallbackBatchJob
methods:
  class_methods: []
  instance_methods: []
name: BatchJobA
persistence: true


BatchJobB具象クラスのATOM定義

BatchJobBクラスは以下のように定義します。BatchJobA同様に固有のendpointとtimeoutを定義しています。

abstract: false
api_generation: false
attributes:
  local_fields:
    - field_default: /v1/batchJobB
      field_immutable: true
      field_name: endpoint
      field_nullable: false
      field_persistence: true
      field_type: string
      field_unique: false
    - field_default: '20'
      field_immutable: false
      field_name: timeout
      field_nullable: true
      field_persistence: true
      field_type: integer
      field_unique: false
category: example
extends:
  - ParallelCallbackBatchJob
methods:
  class_methods: []
  instance_methods: []
name: BatchJobB
persistence: true


BatchJobC具象クラスのATOM定義

BatchJobCクラスは以下のように定義します。BatchJobA同様に固有のendpointとtimeoutを定義しています。

abstract: false
api_generation: false
attributes:
  local_fields:
    - field_default: /v1/batchJobC
      field_immutable: true
      field_name: endpoint
      field_nullable: false
      field_persistence: true
      field_type: string
      field_unique: false
    - field_default: '10'
      field_immutable: false
      field_name: timeout
      field_nullable: true
      field_persistence: true
      field_type: integer
      field_unique: false
category: example
extends:
  - ParallelCallbackBatchJob
methods:
  class_methods: []
  instance_methods: []
name: BatchJobC
persistence: true


ParallelCallbackOrderクラスのATOM定義

ParallelCallbackOrderクラスは以下のように定義します。最初に定義したParallelCallbackBatchJobをリストで保持します。
isCompletedメソッドは、包含するマイクロサービスの処理状態が全て完了しているかをチェックするメソッドです。

abstract: false
api_generation: false
attributes:
  identifier:
    field_immutable: true
    field_name: orderId
    field_persistence: true
    field_type: string
  local_fields:
    - field_immutable: false
      field_name: jobs
      field_nullable: true
      field_persistence: true
      field_type: array<AxisAtom.ParallelCallbackBatchJob>
      field_unique: false
    - field_immutable: false
      field_name: parameters
      field_nullable: true
      field_persistence: true
      field_type: object
      field_unique: false
  ref_fields: []
category: example
methods:
  class_methods: []
  instance_methods:
    - auto_rollback: true
      field_order: ascend
      method_body: |-
        def isCompleted(self):
            for i in self.jobs:
                if not i.isCompleted():
                    return False
            return True
      multiplexable_number: 1
      propagation_mode: true
      topdown: true
name: ParallelCallbackOrder
persistence: true


Mockサービスのシナリオ定義

次に上述したParallelCallbackBatchJobのrunメソッドから呼び出される各マイクロサービスの動作を模擬するモックAPIを作成しておきます。
動作としては、リクエストを受け付けると入力されたtimeout時間処理をスリープしてメインシナリオにコールバックするだけです。
サービス毎にシナリオを作成しても良いですが、処理が全く同じなので一つのシナリオにadditional_pathsを設定して異なるエンドポイントを疑似しています。

- additional_paths:
    - /v1/batchJobB
    - /v1/batchJobC
  category: example
  commands:
    - command: request_validation
      kwargs:
        aspect_options:
          post:
            process: |-
              context.session.set_status(202)
              context.session.finish()
        body:
          properties:
            jobId:
              type: string
            orderId:
              type: string
            timeout:
              type: integer
          required:
            - jobId
            - orderId
            - timeout
          type: object
        headers:
          properties:
            Content-Type:
              enum:
                - application/json
              type: string
          required:
            - Content-Type
          type: object
      label: オーダ受付
    - command: script
      kwargs:
        code: |-
          await asyncio.sleep(context.request.body.timeout)
          await callout(path="/v1/parallelCallbacks/results",
                        method=POST,
                        body=context.request.body.dictionary)
      label: コールバック
  connect_timeout: 60
  global_variables: {}
  method: POST
  name: batchJob
  request_timeout: 60
  routing_auto_generation_mode: true
  routing_options:
    scope: local
  spec:
    response:
      normal:
        codes:
          - 200
  transaction:
    async: true
    auto_response: false
    enable: true
    xname: ''
  uri: /v1/batchJobA
  variable_groups: []


メインシナリオ定義

最後に実際にFanout/Faninを行うメインシナリオを作成します。
メインシナリオでは、リクエストを受付すると最初にヘッダとbodyをバリデーションします。bodyは適当にparametersというキーのobjectを受け入れるようにしていますが中身はDon't careです。
バリデーションを抜けるとParallelCallbackOrderインスタンスを生成してデータベースに永続化して要求元にAccepted応答を返却します。次にParallelCallbackOrderインスタンスが保持するjobに対して並列でrunメソッドを呼び出してFanoutします。このブロックを抜けるとトランザクションは自動的にsuspend状態となります。
各マイクロサービスからのAPIコールバックを受信するとserveコマンドが実行されます。serveコマンドでは受信したコールバック情報からjobインスタンスをローディングして状態を更新します。この時、全てのマイクロサービスのjobが終了していればグローバル変数faninをTrueに設定してserveコマンドを抜けます。Falseの場合は、suspend状態を継続します。

- additional_paths: []
  category: example
  commands:
    - command: request_validation
      kwargs:
        body:
          properties:
            parameters:
              type: object
          required:
            - parameters
          type: object
        headers:
          properties:
            Content-Type:
              enum:
                - application/json
              type: string
          required:
            - Content-Type
          type: object
      label: オーダ受付
    - command: script
      kwargs:
        code: |-
          order = atom.ParallelCallbackOrder(orderId=context.qmonus.xname,
                                             jobs=[atom.BatchJobA(jobId=uuid.uuid1().hex),
                                                   atom.BatchJobB(jobId=uuid.uuid1().hex),
                                                   atom.BatchJobC(jobId=uuid.uuid1().hex)],
                                             parameters=context.request.body.parameters.dictionary)
          await order.save()
          context.session.set_status(202)
          context.session.finish(order.dictionary)

          await asyncio.gather(*[i.run() for i in order.jobs])
      label: ファンアウト
    - command: serve
      kwargs:
        aspect_options:
          pre:
            process: |-
              """callbackを受信したらジョブの状態を更新します
              """
              batchJob = await atom.ParallelCallbackBatchJob.load(context.request.body.jobId)
              await batchJob.save(status="completed")

              """オーダ情報を取得して全てのジョブが完了していればfaninを真に設定
              """
              order = await atom.ParallelCallbackOrder.load(context.request.body.orderId)
              fanin = True if order.isCompleted() else False
        continue_condition: fanin==False
        deadline: 540000
        method: POST
        path: /v1/parallelCallbacks/results
        xname_key: orderId
      label: ファンイン
    - command: script
      kwargs:
        code: print("All BatchJob Completed!!")
      label: 終了
  connect_timeout: 60
  global_variables:
    fanin:
      description: ''
      initial: false
  method: POST
  name: parallelCallbacks
  request_timeout: 60
  routing_auto_generation_mode: true
  routing_options:
    scope: local
  spec:
    response:
      normal:
        codes:
          - 200
  transaction:
    async: true
    auto_response: false
    enable: true
  uri: /v1/parallelCallbacks
  variable_groups: []


Warning

上記のサンプルアプリケーションを動作させる場合は、APIコールバックで利用する/v1/parallelCallbacks/resultsのルーティングをAPIGWに設定することを忘れないでください。
また、APIコールバックが同時に来る場合の排他制御を考慮する場合は追加するAPIコールバックのルーティングパイプラインのQueuingオプションでserialを有効化してください。