サーバレスにアプリケーションを実行する方法を教えてください
Qmonus SDKは、FunctionプラグインをSDKが動作しているコンテナとは別のコンテナを動的に起動してサーバレスに処理させることができます。
具体的には、Kubernetes Jobを生成してSDK Runnerを起動して実行します。SDK Runnerとは、Qmonus SDKで動作するScenarioやATOMなどのプラグイン、各種組込みオブジェクトを用いたスクリプトを単体実行するRunnerモードで起動されたSDKです。
以下の図は、SDKRunnerの外観を示しています。SDKRunnerが利用するメモリストアやデータベースは起動元のSDKから引き継がれ、各種プラグインが共有されます。
SDKRunner組込オブジェクトについて
SDKRunner組込オブジェクトを利用することで簡単にFunctionプラグインの実行を新たなコンテナにオフロードすることができます。マニフェストの組み立てやKubernetes Jobの完了を看取り、Kubernetesのリソースを削除する必要もありません。これらは、SDK側で管理されます。SDKRunnerの使い方を以下に解説します。
>>> runner = await SDKRunner.load()↵...①
... r = await runner.fizzbuzz(15)↵...②
... print(r)↵
... ↵
↵
dc258f8ec9a611ebb2f11691ffd6f87e
>>> r = await runner.waitfor()↵...③
... print(r)↵
... ↵
↵
succeeded
>>> logs = await runner.logs()↵...④
... print(logs)↵
... ↵
↵
{
"jobdc258f8ec9a611ebb2f11691ffd6f87e-jxn9s.containerdc258f8ec9a611ebb2f11691ffd6f87e": [
"[W 210610 04:46:58 monkey_patch:35] Extend resource limits...\n",
"[W 210610 04:46:58 monkey_patch:57] Maximum core file size (RLIMIT_CORE ) : -1 -1\n",
"[W 210610 04:46:58 monkey_patch:57] Maximum CPU time (RLIMIT_CPU ) : -1 -1\n",
"[W 210610 04:46:58 monkey_patch:57] Maximum file size (RLIMIT_FSIZE ) : -1 -1\n",
"[W 210610 04:46:58 monkey_patch:57] Maximum heap size (RLIMIT_DATA ) : -1 -1\n",
"[W 210610 04:46:58 monkey_patch:57] Maximum stack size (RLIMIT_STACK ) : 8388608 -1\n",
"[W 210610 04:46:58 monkey_patch:57] Maximum resident set size (RLIMIT_RSS ) : -1 -1\n",
"[W 210610 04:46:58 monkey_patch:57] Maximum number of processes (RLIMIT_NPROC ) : -1 -1\n",
"[W 210610 04:46:58 monkey_patch:57] Maximum number of open files (RLIMIT_NOFILE ) : 1048576 1048576\n",
"[W 210610 04:46:58 monkey_patch:57] Maximum lockable memory address (RLIMIT_MEMLOCK ) : 16777216 16777216\n",
"[I 210610 04:47:00 boot:109] \n",
" #==========================================================================#\n",
" # #\n",
" # Welcome to Qmonus v21.2LTS #\n",
" # ╔═╗ ┌┬┐┌─┐┌┐┌┬ ┬┌─┐ ╔═╗╔╦╗╦╔═ ╦═╗┬ ┬┌┐┌┌┐┌┌─┐┬─┐ ╔╦╗┌─┐┌┬┐┌─┐ #\n",
" # ║═╬╗││││ │││││ │└─┐───╚═╗ ║║╠╩╗ ╠╦╝│ │││││││├┤ ├┬┘ ║║║│ │ ││├┤ #\n",
" # ╚═╝╚┴ ┴└─┘┘└┘└─┘└─┘ ╚═╝═╩╝╩ ╩ ╩╚═└─┘┘└┘┘└┘└─┘┴└─ ╩ ╩└─┘─┴┘└─┘ #\n",
" # #\n",
" # Copyright(C) 2015-2021 NTT Communications, Inc. All rights reserved. #\n",
" # Author Shoji Hashimoto #\n",
" # #\n",
" #==========================================================================#\n",
"[I 210610 04:47:00 main:63] [1] Overridden Configurations\n",
" apigw = False\n",
" db-host = 'mysql'\n",
" default-apigw-host = 'apigw'\n",
" default-apigw-port = 9000\n",
" plugin-secret-key = 'None'\n",
" redis-local-host = 'redis'\n",
" runner-id = 'dc258f8ec9a611ebb2f11691ffd6f87e'\n",
" scenario = True\n",
" transaction = True\n",
"[W 210610 04:47:00 boot:504] [Limited] gRPC can not be used because the [grpcio, grpcio-tools, protobuf] module is not installed\n",
"[I 210610 04:47:00 boot:355] === Event Loop: <uvloop.Loop running=False closed=False debug=False> ===\n",
"[I 210610 04:47:00 context:3023] Redis connection pool regenerated\n",
"[I 210610 04:47:00 context:1065] [1] Created new database engine\n",
"[I 210610 04:47:00 context:1515] ===== Generated Credential x/szdZ+zUyxWbWCNq6FeMMrMq94260dFX17w79TcQ5d13zCbqp9GtABWpXx82/Kz3AutXBubAVnQ1AZANHqqjGEvgSHsXAc9EoBtIynPOswW8r3akHmMC+eWb311vW5bEbW3yES9I98/zheBunbGpH5THyKZwGk8ROB36G62mdjtpv/grlP8olNSYDECh9453WosZOvv5VixqgkparwAQ0CDWZworYRL+IYe7qAhIps= =====\n",
"[I 210610 04:47:00 context:2619] [1] Sync scenario start...\n",
"[I 210610 04:47:00 context:2625] [1] Sync scenario Done.\n",
"[I 210610 04:47:00 context:589] [pid:1] HealthCheckOperation ATOM class generated\n",
"[I 210610 04:47:00 context:854] Custom JSON format added: python <function python_format at 0x7f77b9765b90>\n",
"[I 210610 04:47:00 context:854] Custom JSON format added: cidr <function cidr at 0x7f77b9765b00>\n",
"[I 210610 04:47:00 context:854] Custom JSON format added: datetime-local <function datetime_local at 0x7f77b9765a70>\n",
"[I 210610 04:47:00 context:854] Custom JSON format added: identifier <function identifier at 0x7f77b97659e0>\n",
"[W 210610 04:47:00 modelschemas:56] HealthCheckOperation is already exists.\n",
"[I 210610 04:47:00 boot:104] {\"status\":200,\"method\":\"POST\",\"url\":\"/modelschemas?selfupdate=false\",\"remote\":\"127.0.0.1\",\"elapsed\":\"16.85ms\"}\n",
"[I 210610 04:47:00 context:589] [pid:1] HealthCheckResource ATOM class generated\n",
"[W 210610 04:47:00 modelschemas:56] HealthCheckResource is already exists.\n",
"[I 210610 04:47:00 boot:104] {\"status\":200,\"method\":\"POST\",\"url\":\"/modelschemas?selfupdate=false\",\"remote\":\"127.0.0.1\",\"elapsed\":\"6.46ms\"}\n",
"[I 210610 04:47:00 context:589] [pid:1] Example ATOM class generated\n",
"[W 210610 04:47:00 modelschemas:56] Example is already exists.\n",
"[I 210610 04:47:00 boot:104] {\"status\":200,\"method\":\"POST\",\"url\":\"/modelschemas?selfupdate=false\",\"remote\":\"127.0.0.1\",\"elapsed\":\"5.97ms\"}\n",
"[I 210610 04:47:00 context:2650] [1] Sync model start...\n",
"[W 210610 04:47:00 context:2667] [1] Detect create of HealthCheckResource table definition\n",
"[W 210610 04:47:00 context:2667] [1] Detect create of HealthCheckOperation table definition\n",
"[W 210610 04:47:00 context:2667] [1] Detect create of Example table definition\n",
"[I 210610 04:47:00 context:2688] creating...'HealthCheckResource'\n",
"[I 210610 04:47:00 context:2690] [1] Created HealthCheckResource table\n",
"[I 210610 04:47:00 context:2688] creating...'HealthCheckOperation'\n",
"[I 210610 04:47:00 context:2690] [1] Created HealthCheckOperation table\n",
"[I 210610 04:47:00 context:2688] creating...'Example'\n",
"[I 210610 04:47:00 context:2690] [1] Created Example table\n",
"[I 210610 04:47:00 context:2705] [1] Sync model Done.\n",
"[I 210610 04:47:00 context:399] \n",
" !=========================================================!\n",
" ! ╔═╗┌─┐┌─┐┌┬┐ ╔╗ ┌─┐┌─┐┌┬┐ ╔═╗┬ ┬┌─┐┌─┐┌─┐┌─┐┌┬┐┌─┐┌┬┐ !\n",
" ! ╠╣ ├─┤└─┐ │ ╠╩╗│ ││ │ │ ╚═╗│ ││ │ ├┤ ├┤ ││├┤ ││ !\n",
" ! ╚ ┴ ┴└─┘ ┴ ╚═╝└─┘└─┘ ┴ ╚═╝└─┘└─┘└─┘└─┘└─┘─┴┘└─┘─┴┘ !\n",
" !=========================================================!\n",
" \n",
"[I 210610 04:47:00 boot:345] Runner ID: dc258f8ec9a611ebb2f11691ffd6f87e\n",
" ===\n",
" return context.fizzbuzz(*[15], **{})\n",
" ===\n",
" \n",
"[I 210610 04:47:01 boot:348] Result:\n",
" FizzBuzz\n"
]
}
>>>
① SDKRunnerインスタンスを生成します。
Note
既存のインスタンスを取得する場合は、以下のように引数でRunner識別子を指定してください。
runner = await SDKRunner.load(<Runner識別子>)
全ての既存インスタンスをリスト取得する場合は、以下のようにretrieveメソッドを使用してください。
runners = await SDKRunner.retrieve()
② SDKRunnerインスタンスにはFunctionプラグインが自動的にバインドされます。同期関数の場合もawait
を指定して呼び出してください。返り値にはRunner識別子が返却されます。
③ SDKRunnerの完了を待ち合わせます。KubernetesのJobがsucceeded
かfailed
に遷移するのを待ち合わせます。引数にtimeoutを指定可能です。デフォルトは180
秒で設定されます。
④ SDKRunnerのPodログを取得します。RunnerのPodでエラーが発生した際は本ログで確認してください。
Note
Jobが終了した際、SDKは、Podの最終500行までのログを取得してJobとConfigMapを自動削除します。Jobのstatusとログはデータベースに格納され過去の情報を取得することもできます。
Warning
Qmonus Labから作成したSDKのプラグイン開発環境は、ユーザ環境単位にNamespaceが分離されますが、Nodeは共有リソースであり、一部のユーザが多量のコンテナを起動すると他の環境にパフォーマンス問題を引き起こす可能性があります。そのため、Namespaceには、デフォルトのResourceQuotaが設定されています。デフォルトの設定では、Pod数のhardリミットとしてSDKの必要Pod数+3
が設定されており、JobのPodとしては3つまで並行起動ができます。増やしたい場合はサポートまでご相談ください。
デザインパターンについて
SDKRunnerを用いてワークロードのPod分割する場合、入出力結果をチェインする様々な方式が考えられます。ここでは、主なデザインパターンを紹介します。
Callback
トランザクション機能を有効化したシナリオでserve
コマンドによるsuspend/resumeでワークフローを駆動するパターンです。serve
コマンドの1つ前のコマンドでSDK Runnerを起動し、serve
コマンドでコールバックを待ち受けます。serve
コマンドには、deadline
タイマーを設定することでPodからのコールバックタイムアウトを設定しておくことを推奨します。コールバックに必要なトランザクション名やコールバック先のエンドポイント情報は、ConfigMap経由で伝搬できます。
インタフェース | 実現手段 |
---|---|
Runner起動トリガー | シナリオのscript コマンド |
Runner完了待機 | シナリオのserve コマンド |
Runner起動パラメータ伝搬 | ConfigMap or Secret |
Runner完了トリガー | Runnerポッドからのcallout 発行 |
Runner処理結果伝搬 | Runnerポッドからのcallout のbody にセット |
サンプル
Callbackパタンのサンプルを掲載します。以下は、SDKRunnerで動作させるFunctionプラグインです。引数で渡されたリクエストパスにキーワード引数をcallbackするだけのものです。
category: example
name: callbackRunner
code: |-
async def callbackRunner(path, **kwargs):
r = await callout(path=path, method=POST, body=kwargs)
print(f"{r.code} {r.body}")
以下は、SDK側で動作させるScenarioプラグインです。SDKRunnerをforkしたら、suspendしてcallbackを待ち受けます。callbackを受信したら次のステップに進行して再度suspendしてcallbackを待ち受けていきます。
- category: example
name: callbackPattern
uri: /v1/callbackPatterns
method: GET
request_timeout: 60
connect_timeout: 60
additional_paths: []
routing_auto_generation_mode: true
routing_options:
scope: local
transaction:
enable: true
async: true
xname: ''
global_variables: {}
variable_groups: []
spec:
response:
normal:
codes:
- 200
commands:
- command: script
kwargs:
code: |-
runner = await SDKRunner.load()
await runner.callbackRunner("/v1/callbackPatterns/phase1", transactionName=context.qmonus.xname)
- command: serve
kwargs:
path: /v1/callbackPatterns/phase1
method: POST
xname_key: transactionName
aspect_options:
post:
process: |-
runner = await SDKRunner.load()
await runner.callbackRunner("/v1/callbackPatterns/phase2", transactionName=context.qmonus.xname)
- command: serve
kwargs:
path: /v1/callbackPatterns/phase2
method: POST
xname_key: transactionName
aspect_options:
post:
process: |-
runner = await SDKRunner.load()
await runner.callbackRunner("/v1/callbackPatterns/phase3", transactionName=context.qmonus.xname)
- command: serve
kwargs:
path: /v1/callbackPatterns/phase3
method: POST
xname_key: transactionName
最後はRoutingプラグインです。これはSDKRunnerからAPIGWにcallbackするためのプロキシ設定となります。
- domain: default
scope: secure
proxy:
scheme: 'http:'
path: '/v1/callbackPatterns/{phase}'
target:
scheme: 'http:'
path: '/v1/callbackPatterns/{phase}'
authorities:
- 'scenario:9000'
connect_timeout: 60
request_timeout: 60
動作確認
サンプルの動作確認はScenarioの実行画面で確認してください。
Event Hook
Runnerポッドで結果のフィードバックを行わず、Job実行監視タスクが自動発行するタスクイベントをフックするJob
プラグインでチェーンするパターンです。
Runnerポッドの異常によってイベント発行できないケースを想定したリカバリ手段を別途検討する必要があります。
インタフェース | 実現手段 |
---|---|
Runner起動トリガー | シナリオのscript コマンドやルーティングのforbidden_process |
Runner完了待機 | Job実行監視タスク |
Runner起動パラメータ伝搬 | ConfigMap or Secret |
Runner完了トリガー | Job実行監視タスクからの自動イベント発行 |
Runner処理結果伝搬 | Cloud SQLなどのデータストレージ |
Tip
SDK Runnerは、Job実行監視タスクによって自動で完了を検出します。Job実行監視タスクは、Kubernetes Jobの状態変化を検出すると以下のtopicのタスクイベントを自動発行します。以下のtopicをlistenするJobプラグインを設定しておくことでイベントフックパターンを実現できます。
SDKRunnerの状態がrunning
に遷移した時: sdk.runner.running
SDKRunnerの状態がsucceeded
に遷移した時: sdk.runner.succeeded
SDKRunnerの状態がfailed
に遷移した時: sdk.runner.failed
尚、SDKRunnerの生成時刻から10分を経過してもJobが稼働しない場合、強制削除扱いとなります。その場合は、 sdk.runner.forceDeleted
イベントが発行されます。
※ jobプラグインのtask.contentには、task.content.runner_id
とtask.content.status
のみが設定されます。
サンプル
Event Hookパタンのサンプルを掲載します。以下は、SDKRunnerで動作させるFunctionプラグインです。乱数で成功か失敗を判定するだけの処理を行います。
category: example
name: eventHookRunner
code: |-
async def eventHookRunner():
import random
result = random.randint(0, 1)
qprint(f"Runner={options.runner_id} Result={result}")
await asyncio.sleep(3)
if result:
raise Error(500)
次は、sdk.runner.succeeded
イベントをハンドリングするJobプラグインです。
- name: sdkRunnerSucceeded
topics:
- sdk.runner.succeeded
script: qprint("Finished because the runner was successful")
最後は、sdk.runner.failed
イベントをハンドリングするJobプラグインです。再度Runnerを実行します。
- name: sdkRunnerFailed
topics:
- sdk.runner.failed
script: |-
qprint("Retry because the runner failed...")
runner = await SDKRunner.load()
await runner.eventHookRunner()
動作確認
Event Hookはトレースがしにくいですが、qprint
で出力するようにしてdebugモードで実行することで処理が追跡しやすくなります。
>>> debug()↵
Disconnected the currently subscribed channel and connected to the debug channel ['xaas.southbound.channel']
debug channel connected
>>> runner = await SDKRunner.load()↵
... await runner.eventHookRunner()↵
... ↵
↵
Runner=sdk-runner-98fe5206cfda11eb9bf9acde48001122 Result=1
Retry because the runner failed...
Runner=sdk-runner-cc0088d6cfda11eb9bf9acde48001122 Result=1
Retry because the runner failed...
Runner=sdk-runner-01ec2626cfdb11eb9bf9acde48001122 Result=1
Retry because the runner failed...
Runner=sdk-runner-37e4cf30cfdb11eb9bf9acde48001122 Result=1
Retry because the runner failed...
Runner=sdk-runner-6df1bcb4cfdb11eb9bf9acde48001122 Result=0
Finished because the runner was successful
>>> debug(False)↵
debug channel disconnected
>>>
Periodic Polling
Runnerポッドが永続化した結果を、定周期のDaemonプラグインでデータベースを監視して検出するパターンです。単純にデータベースをポーリングするだけなのでサンプルは掲載しません。
インタフェース | 実現手段 |
---|---|
Runner起動トリガー | シナリオのscript コマンドやルーティングのforbidden_process |
Runner完了待機 | DaemonによるDB監視 |
Runner起動パラメータ伝搬 | ConfigMap or Secret |
Runner完了トリガー | - |
Runner処理結果伝搬 | Cloud SQLなどのデータストレージ |
Queue-Worker
RunnerポッドでFIFO
やmFIFO
に結果をキューイングしてWorker
プラグインで刈り取るパターンです。
Runnerポッドの異常によってキューイングできないケースを想定したリカバリ手段を別途検討する必要があります。Runner側ではキューイングするだけですのでDocs » Collector/Reflector » Worker
などを参考にしてください。
インタフェース | 実現手段 |
---|---|
Runner起動トリガー | シナリオのscript コマンドやルーティングのforbidden_process |
Runner完了待機 | Workerによるキューコンシューム |
Runner起動パラメータ伝搬 | ConfigMap or Secret |
Runner完了トリガー | RunnerポッドからのFIFO 、mFIFO などへのキューイング |
Runner処理結果伝搬 | Runnerポッドからのキューイング情報にセット |
持ち込みアプリケーション
持ち込みアプリケーションを駆動させたい場合、SDKRunnerとして駆動させるか、ご自身でKubernetesリソースを管理するプラグインを作成するかの2通りの方法があります。前者の場合は、マニフェストは、プラグイン開発者が作成し、その実行管理はSDKに委譲することができます。後者の場合は、APPENDIX>>Kubernetes連携
に記載されているKube
組込オブジェクトを利用してマニフェスト作成、リソースの状態判定、リソース削除の全てをプラグイン開発者自身で作成する必要があります。
Warning
持ち込みアプリケーションをプライベートのイメージレジストリからpullする場合は、ImagePullSecretsを設定しますが、セキュリティに配慮するため、必ずサポートまでご相談ください。
持ち込みアプリケーションをSDKRunnerで管理する方法
持ち込みアプリケーションのマニフェスト定義yamlもしくはマニフェストオブジェクトリストをSDKRunner組込オブジェクトのguest
メソッドに渡すことで実行できます。
以下のFunctionプラグインは、busyboxイメージで単純なpingを打ち結果をファイル出力したら起動元のSDKのAPIGWにファイルをPOSTします。その後、APIGWにPOSTされたファイルをScenarioサーバで読み取り標準出力に表示するサンプルです。
category: example
name: pinger
code: |-
async def pinger(target="localhost", count=10):
import os
from axis.boot.context import SystemContext
headers = SystemContext.getinstance().authentication_headers()
(key, token) = (headers["X-Xaas-Family-Key"], headers["X-Xaas-Auth-Token"])
NAMESPACE = os.environ["QMONUS_JOB_RUNNER_NAMESPACE"]
container = k8s.V1Container(name="pinger",
image="yauritux/busybox-curl",
command=["/bin/sh", "-c", f"ping -c {count} {target} | tee -i pinger.log;curl -H 'X-Xaas-Family-Key:{key}' -H 'X-Xaas-Auth-Token:{token}' -T pinger.log http://{options.default_apigw_host}:{options.default_apigw_port}/upload/pinger.log"])
template = k8s.V1PodTemplateSpec(spec=k8s.V1PodSpec(restart_policy="Never", containers=[container]))
spec = k8s.V1JobSpec(template=template, backoff_limit=0)
job = k8s.V1Job(api_version="batch/v1", kind="Job", metadata=k8s.V1ObjectMeta(name="pinger", namespace=NAMESPACE, labels=dict(product="QmonusSDK", capability="runner")), spec=spec)
runner = await SDKRunner.load()
r = await runner.guest(manifests=[job])
print(r)
r = await runner.waitfor()
print(r)
logs = await runner.logs()
print(logs)
print("Pinger Done.")
def streaming_callback(chunk):
print(chunk.decode("utf-8"))
await callout(path="/download/pinger.log", streaming_callback=streaming_callback)
APIGWからストリーミングでファイルをダウンロードするため以下のルーティングを登録しておいてください。
- scope: secure
domain: default
proxy:
scheme: 'http:'
path: '/download/{filename}'
authorization:
auth_mode: qmonus
target:
scheme: 'http:'
path: '/download/{filename}'
authorities:
- 0.0.0.0
request_forbidden_process: |-
import aiofiles
import os
if not resources.get("filename", None):
raise HTTPError(400, reason="Unspecified filename")
session.set_header("Content-Type", "application/octet-stream")
filepath = os.path.join(os.getcwd(), "upload", resources["filename"])
if not os.path.exists(filepath):
raise HTTPError(404, reason="File Not found %r" % resources["filename"])
async with aiofiles.open(filepath,"rb") as f:
while True:
data = await f.read(1024)
if not data:
break
session.write(data)
session.flush()
session.finish()
connect_timeout: 60
request_timeout: 60
実行すると以下のようにSDKRunnerによって持ち込みのマニフェストも管理されることが確認できます。
>>> await context.pinger()↵
... ↵
sdk-guest-runner-21498bb8cf1211ebbe229a6daf065b2a
succeeded
{
"pinger-zq7kl.pinger": [
"PING localhost (127.0.0.1): 56 data bytes\n",
"64 bytes from 127.0.0.1: seq=0 ttl=64 time=0.040 ms\n",
"64 bytes from 127.0.0.1: seq=1 ttl=64 time=0.041 ms\n",
"64 bytes from 127.0.0.1: seq=2 ttl=64 time=0.049 ms\n",
"64 bytes from 127.0.0.1: seq=3 ttl=64 time=0.051 ms\n",
"64 bytes from 127.0.0.1: seq=4 ttl=64 time=0.047 ms\n",
"64 bytes from 127.0.0.1: seq=5 ttl=64 time=0.036 ms\n",
"64 bytes from 127.0.0.1: seq=6 ttl=64 time=0.055 ms\n",
"64 bytes from 127.0.0.1: seq=7 ttl=64 time=0.053 ms\n",
"64 bytes from 127.0.0.1: seq=8 ttl=64 time=0.045 ms\n",
"64 bytes from 127.0.0.1: seq=9 ttl=64 time=0.041 ms\n",
"\n",
"--- localhost ping statistics ---\n",
"10 packets transmitted, 10 packets received, 0% packet loss\n",
"round-trip min/avg/max = 0.036/0.045/0.055 ms\n",
" % Total % Received % Xferd Average Speed Time Time Time Current\n",
" Dload Upload Total Spent Left Speed\n",
"\r 0 0 0 0 0 0 0 0 --:--:-- --:--:-- --:--:-- 0\r100 820 100 117 100 703 5191 31195 --:--:-- --:--:-- --:--:-- 70300\n",
"{\"address\": \"127.0.0.1\", \"port\": 9000, \"filepath\": \"/root/axis-stable/upload/pinger.log\", \"mime\": null, \"bytes\": 703}"
]
}
Pinger Done.
PING localhost (127.0.0.1): 56 data bytes
64 bytes from 127.0.0.1: seq=0 ttl=64 time=0.040 ms
64 bytes from 127.0.0.1: seq=1 ttl=64 time=0.041 ms
64 bytes from 127.0.0.1: seq=2 ttl=64 time=0.049 ms
64 bytes from 127.0.0.1: seq=3 ttl=64 time=0.051 ms
64 bytes from 127.0.0.1: seq=4 ttl=64 time=0.047 ms
64 bytes from 127.0.0.1: seq=5 ttl=64 time=0.036 ms
64 bytes from 127.0.0.1: seq=6 ttl=64 time=0.055 ms
64 bytes from 127.0.0.1: seq=7 ttl=64 time=0.053 ms
64 bytes from 127.0.0.1: seq=8 ttl=64 time=0.045 ms
64 bytes from 127.0.0.1: seq=9 ttl=64 time=0.041 ms
--- localhost ping statistics ---
10 packets transmitted, 10 packets received, 0% packet loss
round-trip min/avg/max = 0.036/0.045/0.055 ms
↵
>>>
Note
SDKRunnerのguest
インスタンスメソッドは、以下のように宣言されています。
async def guest(self, manifests=[], cluster=None, sync=False)
リモートのKubernetesクラスターに適用する場合は、cluster
を指定してください。sync=True
とした場合は、waitfor
を自動で呼び出すため、Jobの完了まで応答しません。
Tip
Kubernetes Job状態はSDKで看取りますが、KubernetesのTTL controllerが有効化され、Job Specの.spec.ttlSecondsAfterFinished
が設定されている場合は、自動でJob/Podが削除されるため、状況によっては最終状態を看取ることができません。.spec.ttlSecondsAfterFinished
は設定しないようにしてください。
持ち込みアプリケーションを自身で管理する方法
ここではサンプルアプリケーションとして、少し複雑なアプリケーションを題材としたいため、Qmonus SDK自体を持ち込みアプリケーションと仮定します。
Qmonus SDKをRunnerモードで起動することとし、QmonusスクリプトをConfigMapにInjectし、JobのContainerセクションでConfigMapのマウントとRunnerモードの起動パラメータを指定してマニフェストを組み立てます。マニフェストを適用したら、Jobのステータスを看取り成功したらJob/ConfigMapを削除します。失敗時はログ解析のため、リソースは残すという仕様で作成してみましょう。
Functionプラグインを作成する
ConfigMapとJobを生成するSDKプラグインをFunctionプラグインとして作成します。(特に制約はないのでAPIGWのルーティングプラグインやScenario、ATOMなどに記述することもできます)
Tip
Kubernetes Jobを作成するために、Namespaceとイメージが必要です。これらは、Qmonus SDKの場合は、Qmonus LabやValue StreamからデプロイされていればSDKのPodの環境変数から取得することができます。
Namespaceは、環境変数QMONUS_JOB_RUNNER_NAMESPACE
に設定されています。SDKイメージ名は、環境変数QMONUS_SDK_IMAGE_NAME
に設定されています。
インクラスターkubeオブジェクトについて
通常は、Appendix>>Kubernetes連携
に記載されている手順でリモートのkubernetesクラスタを作成して操作しますが、SDKのNamespaceを操作する場合はインクラスターKubeオブジェクトを取得すれば操作できます。インクラスターKubeオブジェクトは、Frontalからクラスター登録をする必要がなく、k = await Kube.load()
で取得できます。
Functionプラグインは以下のように定義してください。SDK Runnerの起動パラメータは、Runner側でAPIGW、Transaction、Scenarioを有効化していますが、Runnerから外部にcalloutするようなケースにおいて起動元のSDKが所属するAPIGWを利用したい場合は、--apigw=False
として--default_apigw_host=apigw
と指定する必要があります。
category: example
name: avator
code: |-
async def avator(name, script):
import os
k = await Kube.load()
# 環境変数QMONUS_JOB_RUNNER_NAMESPACEとQMONUS_SDK_IMAGE_NAMEはデフォルトで設定されています
NAMESPACE = os.environ["QMONUS_JOB_RUNNER_NAMESPACE"]
IMAGE = os.environ["QMONUS_SDK_IMAGE_NAME"]
# Runnerで実行するスクリプトをConfigMapにInjectする
configmap = k8s.V1ConfigMap(api_version="v1",
kind="ConfigMap",
data={name: script},
metadata=k8s.V1ObjectMeta(name=f"{name}-configmap", namespace=NAMESPACE))
r = await k.getNamespace(NAMESPACE).createConfigMap(configmap)
print(f"ConfigMap:{name}-configmap created")
# ConfigMapをマウントしてRunnerをJobで起動する
container = k8s.V1Container(name="qmonus-runner",
image=IMAGE,
image_pull_policy="Always",
args=[f"--run_script=/root/config/{name}",
f"--apigw=True",
f"--transaction=True",
f"--scenario=True",
f"--db_host={options.db_host}",
f"--redis_local_host={options.redis_local_host}"],
volume_mounts=[k8s.V1VolumeMount(mount_path="/root/config", name="config-volume")])
template = k8s.V1PodTemplateSpec(spec=k8s.V1PodSpec(restart_policy="Never",
containers=[container],
volumes=[k8s.V1Volume(name="config-volume",
config_map=k8s.V1ConfigMapVolumeSource(name=f"{name}-configmap"))]))
spec = k8s.V1JobSpec(template=template, backoff_limit=0)
job = k8s.V1Job(api_version="batch/v1", kind="Job", metadata=k8s.V1ObjectMeta(name=name, namespace=NAMESPACE), spec=spec)
r = await k.getNamespace(NAMESPACE).createJob(job)
status = None
while True:
r = await k.getNamespace(NAMESPACE).listJob(name)
if not status or status != r.status.to_dict():
status = r.status.to_dict()
print(f"Job:{name}\n{json.dumps(status, indent=4)}")
if r.status.succeeded or r.status.failed:
print(f"Job:{name} Done.\n{r.status}")
break
await asyncio.sleep(3)
if status["succeeded"]:
await k.getNamespace(NAMESPACE).deleteJob(name)
print(f"Job:{name} deleted")
await k.getNamespace(NAMESPACE).deleteConfigMap(f"{name}-configmap")
print(f"ConfigMap:{name}-configmap deleted")
elif status["failed"]:
print(f"Job execution error occurred")
動作確認
Functionプラグインを作成したので実行します。Functionプラグインavator
は、Jobの名前とQmonusスクリプトのテキストを引数で渡すことで動作します。
ここでは、チュートリアル>>Practical Test-Driven Development
で作成したテストケースをRunner Podで実行させます。テスト実行自体は、Runner Podで実行されますが、Runner PodはSDKと同じデータベースやメモリストアに接続されているため、テストの進捗は、SDK側のポータルで確認することができます。
RunnerのPodログは、FrontalのGUIから確認することができます。通常のSDKプラグインとは実行空間が異なるため、Runnerで動作させるスクリプトはSDKのPodで十分に動作確認してからJobでテストすることをお勧めします。
>>> await context.avator("test", "await Test.runall()")↵
... ↵
ConfigMap:test-configmap created
Job:test
{
"active": null,
"completion_time": null,
"conditions": null,
"failed": null,
"start_time": null,
"succeeded": null
}
Job:test
{
"active": 1,
"completion_time": null,
"conditions": null,
"failed": null,
"start_time": "2021-06-04T01:52:25+00:00",
"succeeded": null
}
Job:test
{
"active": null,
"completion_time": "2021-06-04T01:56:00+00:00",
"conditions": [
{
"last_probe_time": "2021-06-04T01:56:00+00:00",
"last_transition_time": "2021-06-04T01:56:00+00:00",
"message": null,
"reason": null,
"status": "True",
"type": "Complete"
}
],
"failed": null,
"start_time": "2021-06-04T01:52:25+00:00",
"succeeded": 1
}
Job:test Done.
{'active': None,
'completion_time': datetime.datetime(2021, 6, 4, 1, 56, tzinfo=tzlocal()),
'conditions': [{'last_probe_time': datetime.datetime(2021, 6, 4, 1, 56, tzinfo=tzlocal()),
'last_transition_time': datetime.datetime(2021, 6, 4, 1, 56, tzinfo=tzlocal()),
'message': None,
'reason': None,
'status': 'True',
'type': 'Complete'}],
'failed': None,
'start_time': datetime.datetime(2021, 6, 4, 1, 52, 25, tzinfo=tzlocal()),
'succeeded': 1}
Job:test deleted
↵
ConfigMap:test-configmap deleted
>>>