多重実行に対応していないバックエンド処理要求をキューイングして逐次処理する方法を教えてください



単純なキューイングによる逐次実行は、FIFO組込オブジェクトで実現できます。以下のフローのようにAPIリクエストをAPIGWのroutingFIFOに挿入してReflectorのworkerで刈り取り逐次処理を行うパターンがシンプルです。


サンプルアプリケーション

オーダ情報を管理するATOMを作成する

APIリクエストbodyをparameters変数にそのまま代入するシンプルなデータモデルとします。statusフィールドで処理の進捗が分かるようにしておきます。

category: example
name: OrderEntry
persistence: true
abstract: false
api_generation: false
attributes:
  identifier:
    field_default: uuid.uuid1().hex
    field_immutable: true
    field_name: orderId
    field_persistence: true
    field_type: string
  local_fields:
    - field_default: dict()
      field_immutable: false
      field_name: parameters
      field_nullable: false
      field_persistence: true
      field_type: object
      field_unique: false
    - field_default: standby
      field_enum:
        - standby
        - processing
        - done
        - discarded
      field_immutable: false
      field_name: status
      field_nullable: false
      field_persistence: true
      field_type: string
      field_unique: false
  ref_fields: []
methods:
  class_methods: []
  instance_methods: []


APIリクエストをオーダ情報に蓄積して処理オーダをFIFOに挿入するルーティングを作成する

POSTメソッドでアクセスされた場合は、OrderEntryインスタンスを生成してデータベースに保存し、FIFOに挿入します。
GETメソッドでアクセスされた場合は、OrderEntryインスタンスの情報を返却します。

- domain: example
  proxy:
    authorization:
      auth_mode: axis
    path: '/v1/orders/{orderId}'
    scheme: 'http:'
  target:
    path: '/v1/orders/{orderId}'
    scheme: 'http:'
  authorities:
    - 'http://localhost:9099'
  request_forbidden_process: |-
    if request.method == POST:
        order = atom.OrderEntry(parameters=json.loads(request.body))
        await order.save()
        await FIFO("orders").put(order.localfields())
        return 202, order.localfields()

    elif request.method == GET:
        orderId = resources.get("orderId", None)
        if orderId:
            order = await atom.OrderEntry.load(orderId)
            return 200, order.localfields()
        else:
            orders = await atom.OrderEntry.retrieve()
            return 200, [i.localfields() for i in orders]

    else:
        raise HTTPError(405, reason="Method not allowed")
  connect_timeout: 60
  request_timeout: 60


FIFOを監視してオーダを検出したら任意の処理を実行するワーカーを作成する

FIFOからオーダ情報を取り出し、OrderEntryインスタンスをロードして状態を変化させます。任意の処理部はダミーのスリープを入れています。

category: example
mode: single
name: Dispatcher
script: |-
  async def consume():
      while Runtime.running():
          async with FIFO("orders") as order:
              if order is None:
                  return
              entry = await atom.OrderEntry.load(order.orderId)
              await entry.save(status="processing")

              """任意の処理を実行
              """
              await asyncio.sleep(3)

              await entry.save(status="done")
              qprint(entry.yaml_format)


動作確認

REPLのデバッグモードで動作を確認します。

>>> debug()↵
Disconnected the currently subscribed channel and connected to the debug channel ['xaas.southbound.channel']
debug channel connected
>>> for i in range(5):↵
...     r = await callout(path="/v1/orders/", method=POST, body={"name": "hoge%d" % i})↵
...     print(r.code)↵
... ↵
202
202
202
202
202
OrderEntry:
  instance: T3JkZXJFbnRyeTpiMTk2YTQ5Yzg2YmMxMWViYjIwMGFjZGU0ODAwMTEyMg==
  orderId: b1969fc486bc11ebb200acde48001122
  parameters:
    name: hoge0
  status: done
OrderEntry:
  instance: T3JkZXJFbnRyeTpiMTk3ZmI0NDg2YmMxMWViYjIwMGFjZGU0ODAwMTEyMg==
  orderId: b197f6bc86bc11ebb200acde48001122
  parameters:
    name: hoge1
  status: done
OrderEntry:
  instance: T3JkZXJFbnRyeTpiMTk5NWM2ZTg2YmMxMWViYjIwMGFjZGU0ODAwMTEyMg==
  orderId: b199576e86bc11ebb200acde48001122
  parameters:
    name: hoge2
  status: done
OrderEntry:
  instance: T3JkZXJFbnRyeTpiMTlhOTk3Njg2YmMxMWViYjIwMGFjZGU0ODAwMTEyMg==
  orderId: b19a94c686bc11ebb200acde48001122
  parameters:
    name: hoge3
  status: done
OrderEntry:
  instance: T3JkZXJFbnRyeTpiMTliZTRjYTg2YmMxMWViYjIwMGFjZGU0ODAwMTEyMg==
  orderId: b19bde8086bc11ebb200acde48001122
  parameters:
    name: hoge4
  status: done
>>> debug(False)↵
debug channel disconnected
>>>