多重実行に対応していないバックエンド処理要求をキューイングして逐次処理する方法を教えてください
単純なキューイングによる逐次実行は、FIFO
組込オブジェクトで実現できます。以下のフローのようにAPIリクエストをAPIGWのrouting
でFIFO
に挿入してReflectorのworker
で刈り取り逐次処理を行うパターンがシンプルです。
サンプルアプリケーション
オーダ情報を管理するATOMを作成する
APIリクエストbodyをparameters
変数にそのまま代入するシンプルなデータモデルとします。status
フィールドで処理の進捗が分かるようにしておきます。
category: example
name: OrderEntry
persistence: true
abstract: false
api_generation: false
attributes:
identifier:
field_default: uuid.uuid1().hex
field_immutable: true
field_name: orderId
field_persistence: true
field_type: string
local_fields:
- field_default: dict()
field_immutable: false
field_name: parameters
field_nullable: false
field_persistence: true
field_type: object
field_unique: false
- field_default: standby
field_enum:
- standby
- processing
- done
- discarded
field_immutable: false
field_name: status
field_nullable: false
field_persistence: true
field_type: string
field_unique: false
ref_fields: []
methods:
class_methods: []
instance_methods: []
APIリクエストをオーダ情報に蓄積して処理オーダをFIFOに挿入するルーティングを作成する
POSTメソッドでアクセスされた場合は、OrderEntry
インスタンスを生成してデータベースに保存し、FIFO
に挿入します。
GETメソッドでアクセスされた場合は、OrderEntry
インスタンスの情報を返却します。
- domain: example
proxy:
authorization:
auth_mode: axis
path: '/v1/orders/{orderId}'
scheme: 'http:'
target:
path: '/v1/orders/{orderId}'
scheme: 'http:'
authorities:
- 'http://localhost:9099'
request_forbidden_process: |-
if request.method == POST:
order = atom.OrderEntry(parameters=json.loads(request.body))
await order.save()
await FIFO("orders").put(order.localfields())
return 202, order.localfields()
elif request.method == GET:
orderId = resources.get("orderId", None)
if orderId:
order = await atom.OrderEntry.load(orderId)
return 200, order.localfields()
else:
orders = await atom.OrderEntry.retrieve()
return 200, [i.localfields() for i in orders]
else:
raise HTTPError(405, reason="Method not allowed")
connect_timeout: 60
request_timeout: 60
FIFOを監視してオーダを検出したら任意の処理を実行するワーカーを作成する
FIFOからオーダ情報を取り出し、OrderEntryインスタンスをロードして状態を変化させます。任意の処理部はダミーのスリープを入れています。
category: example
mode: single
name: Dispatcher
script: |-
async def consume():
while Runtime.running():
async with FIFO("orders") as order:
if order is None:
return
entry = await atom.OrderEntry.load(order.orderId)
await entry.save(status="processing")
"""任意の処理を実行
"""
await asyncio.sleep(3)
await entry.save(status="done")
qprint(entry.yaml_format)
動作確認
REPLのデバッグモードで動作を確認します。
>>> debug()↵
Disconnected the currently subscribed channel and connected to the debug channel ['xaas.southbound.channel']
debug channel connected
>>> for i in range(5):↵
... r = await callout(path="/v1/orders/", method=POST, body={"name": "hoge%d" % i})↵
... print(r.code)↵
... ↵
202
202
202
202
202
OrderEntry:
instance: T3JkZXJFbnRyeTpiMTk2YTQ5Yzg2YmMxMWViYjIwMGFjZGU0ODAwMTEyMg==
orderId: b1969fc486bc11ebb200acde48001122
parameters:
name: hoge0
status: done
OrderEntry:
instance: T3JkZXJFbnRyeTpiMTk3ZmI0NDg2YmMxMWViYjIwMGFjZGU0ODAwMTEyMg==
orderId: b197f6bc86bc11ebb200acde48001122
parameters:
name: hoge1
status: done
OrderEntry:
instance: T3JkZXJFbnRyeTpiMTk5NWM2ZTg2YmMxMWViYjIwMGFjZGU0ODAwMTEyMg==
orderId: b199576e86bc11ebb200acde48001122
parameters:
name: hoge2
status: done
OrderEntry:
instance: T3JkZXJFbnRyeTpiMTlhOTk3Njg2YmMxMWViYjIwMGFjZGU0ODAwMTEyMg==
orderId: b19a94c686bc11ebb200acde48001122
parameters:
name: hoge3
status: done
OrderEntry:
instance: T3JkZXJFbnRyeTpiMTliZTRjYTg2YmMxMWViYjIwMGFjZGU0ODAwMTEyMg==
orderId: b19bde8086bc11ebb200acde48001122
parameters:
name: hoge4
status: done
>>> debug(False)↵
debug channel disconnected
>>>