サーバレスにアプリケーションを実行する方法を教えてください



Qmonus SDKは、FunctionプラグインをSDKが動作しているコンテナとは別のコンテナを動的に起動してサーバレスに処理させることができます。
具体的には、Kubernetes Jobを生成してSDK Runnerを起動して実行します。SDK Runnerとは、Qmonus SDKで動作するScenarioやATOMなどのプラグイン、各種組込みオブジェクトを用いたスクリプトを単体実行するRunnerモードで起動されたSDKです。

以下の図は、SDKRunnerの外観を示しています。SDKRunnerが利用するメモリストアやデータベースは起動元のSDKから引き継がれ、各種プラグインが共有されます。


SDKRunner組込オブジェクトについて

SDKRunner組込オブジェクトを利用することで簡単にFunctionプラグインの実行を新たなコンテナにオフロードすることができます。マニフェストの組み立てやKubernetes Jobの完了を看取り、Kubernetesのリソースを削除する必要もありません。これらは、SDK側で管理されます。SDKRunnerの使い方を以下に解説します。

>>> runner = await SDKRunner.load()↵...①
... r = await runner.fizzbuzz(15)↵...②
... print(r)↵
... ↵
↵
dc258f8ec9a611ebb2f11691ffd6f87e
>>> r = await runner.waitfor()↵...③
... print(r)↵
... ↵
↵
succeeded
>>> logs = await runner.logs()↵...④
... print(logs)↵
... ↵
↵
{
    "jobdc258f8ec9a611ebb2f11691ffd6f87e-jxn9s.containerdc258f8ec9a611ebb2f11691ffd6f87e": [
        "[W 210610 04:46:58 monkey_patch:35] Extend resource limits...\n",
        "[W 210610 04:46:58 monkey_patch:57] Maximum core file size            (RLIMIT_CORE    ) :                   -1                   -1\n",
        "[W 210610 04:46:58 monkey_patch:57] Maximum CPU time                  (RLIMIT_CPU     ) :                   -1                   -1\n",
        "[W 210610 04:46:58 monkey_patch:57] Maximum file size                 (RLIMIT_FSIZE   ) :                   -1                   -1\n",
        "[W 210610 04:46:58 monkey_patch:57] Maximum heap size                 (RLIMIT_DATA    ) :                   -1                   -1\n",
        "[W 210610 04:46:58 monkey_patch:57] Maximum stack size                (RLIMIT_STACK   ) :              8388608                   -1\n",
        "[W 210610 04:46:58 monkey_patch:57] Maximum resident set size         (RLIMIT_RSS     ) :                   -1                   -1\n",
        "[W 210610 04:46:58 monkey_patch:57] Maximum number of processes       (RLIMIT_NPROC   ) :                   -1                   -1\n",
        "[W 210610 04:46:58 monkey_patch:57] Maximum number of open files      (RLIMIT_NOFILE  ) :              1048576              1048576\n",
        "[W 210610 04:46:58 monkey_patch:57] Maximum lockable memory address   (RLIMIT_MEMLOCK ) :             16777216             16777216\n",
        "[I 210610 04:47:00 boot:109] \n",
        "    #==========================================================================#\n",
        "    #                                                                          #\n",
        "    #   Welcome to Qmonus v21.2LTS                                             #\n",
        "    #   ╔═╗ ┌┬┐┌─┐┌┐┌┬ ┬┌─┐   ╔═╗╔╦╗╦╔═  ╦═╗┬ ┬┌┐┌┌┐┌┌─┐┬─┐  ╔╦╗┌─┐┌┬┐┌─┐      #\n",
        "    #   ║═╬╗││││ │││││ │└─┐───╚═╗ ║║╠╩╗  ╠╦╝│ │││││││├┤ ├┬┘  ║║║│ │ ││├┤       #\n",
        "    #   ╚═╝╚┴ ┴└─┘┘└┘└─┘└─┘   ╚═╝═╩╝╩ ╩  ╩╚═└─┘┘└┘┘└┘└─┘┴└─  ╩ ╩└─┘─┴┘└─┘      #\n",
        "    #                                                                          #\n",
        "    #   Copyright(C) 2015-2021 NTT Communications, Inc. All rights reserved.   #\n",
        "    #   Author Shoji Hashimoto                                                 #\n",
        "    #                                                                          #\n",
        "    #==========================================================================#\n",
        "[I 210610 04:47:00 main:63] [1] Overridden Configurations\n",
        "    apigw = False\n",
        "    db-host = 'mysql'\n",
        "    default-apigw-host = 'apigw'\n",
        "    default-apigw-port = 9000\n",
        "    plugin-secret-key = 'None'\n",
        "    redis-local-host = 'redis'\n",
        "    runner-id = 'dc258f8ec9a611ebb2f11691ffd6f87e'\n",
        "    scenario = True\n",
        "    transaction = True\n",
        "[W 210610 04:47:00 boot:504] [Limited] gRPC can not be used because the [grpcio, grpcio-tools, protobuf] module is not installed\n",
        "[I 210610 04:47:00 boot:355] === Event Loop: <uvloop.Loop running=False closed=False debug=False> ===\n",
        "[I 210610 04:47:00 context:3023] Redis connection pool regenerated\n",
        "[I 210610 04:47:00 context:1065] [1] Created new database engine\n",
        "[I 210610 04:47:00 context:1515] ===== Generated Credential x/szdZ+zUyxWbWCNq6FeMMrMq94260dFX17w79TcQ5d13zCbqp9GtABWpXx82/Kz3AutXBubAVnQ1AZANHqqjGEvgSHsXAc9EoBtIynPOswW8r3akHmMC+eWb311vW5bEbW3yES9I98/zheBunbGpH5THyKZwGk8ROB36G62mdjtpv/grlP8olNSYDECh9453WosZOvv5VixqgkparwAQ0CDWZworYRL+IYe7qAhIps= =====\n",
        "[I 210610 04:47:00 context:2619] [1] Sync scenario start...\n",
        "[I 210610 04:47:00 context:2625] [1] Sync scenario Done.\n",
        "[I 210610 04:47:00 context:589] [pid:1] HealthCheckOperation ATOM class generated\n",
        "[I 210610 04:47:00 context:854] Custom JSON format added: python <function python_format at 0x7f77b9765b90>\n",
        "[I 210610 04:47:00 context:854] Custom JSON format added: cidr <function cidr at 0x7f77b9765b00>\n",
        "[I 210610 04:47:00 context:854] Custom JSON format added: datetime-local <function datetime_local at 0x7f77b9765a70>\n",
        "[I 210610 04:47:00 context:854] Custom JSON format added: identifier <function identifier at 0x7f77b97659e0>\n",
        "[W 210610 04:47:00 modelschemas:56] HealthCheckOperation is already exists.\n",
        "[I 210610 04:47:00 boot:104] {\"status\":200,\"method\":\"POST\",\"url\":\"/modelschemas?selfupdate=false\",\"remote\":\"127.0.0.1\",\"elapsed\":\"16.85ms\"}\n",
        "[I 210610 04:47:00 context:589] [pid:1] HealthCheckResource ATOM class generated\n",
        "[W 210610 04:47:00 modelschemas:56] HealthCheckResource is already exists.\n",
        "[I 210610 04:47:00 boot:104] {\"status\":200,\"method\":\"POST\",\"url\":\"/modelschemas?selfupdate=false\",\"remote\":\"127.0.0.1\",\"elapsed\":\"6.46ms\"}\n",
        "[I 210610 04:47:00 context:589] [pid:1] Example ATOM class generated\n",
        "[W 210610 04:47:00 modelschemas:56] Example is already exists.\n",
        "[I 210610 04:47:00 boot:104] {\"status\":200,\"method\":\"POST\",\"url\":\"/modelschemas?selfupdate=false\",\"remote\":\"127.0.0.1\",\"elapsed\":\"5.97ms\"}\n",
        "[I 210610 04:47:00 context:2650] [1] Sync model start...\n",
        "[W 210610 04:47:00 context:2667] [1] Detect create of HealthCheckResource table definition\n",
        "[W 210610 04:47:00 context:2667] [1] Detect create of HealthCheckOperation table definition\n",
        "[W 210610 04:47:00 context:2667] [1] Detect create of Example table definition\n",
        "[I 210610 04:47:00 context:2688] creating...'HealthCheckResource'\n",
        "[I 210610 04:47:00 context:2690] [1] Created HealthCheckResource table\n",
        "[I 210610 04:47:00 context:2688] creating...'HealthCheckOperation'\n",
        "[I 210610 04:47:00 context:2690] [1] Created HealthCheckOperation table\n",
        "[I 210610 04:47:00 context:2688] creating...'Example'\n",
        "[I 210610 04:47:00 context:2690] [1] Created Example table\n",
        "[I 210610 04:47:00 context:2705] [1] Sync model Done.\n",
        "[I 210610 04:47:00 context:399] \n",
        "    !=========================================================!\n",
        "    ! ╔═╗┌─┐┌─┐┌┬┐  ╔╗ ┌─┐┌─┐┌┬┐  ╔═╗┬ ┬┌─┐┌─┐┌─┐┌─┐┌┬┐┌─┐┌┬┐ !\n",
        "    ! ╠╣ ├─┤└─┐ │   ╠╩╗│ ││ │ │   ╚═╗│ ││  │  ├┤ ├┤  ││├┤  ││ !\n",
        "    ! ╚  ┴ ┴└─┘ ┴   ╚═╝└─┘└─┘ ┴   ╚═╝└─┘└─┘└─┘└─┘└─┘─┴┘└─┘─┴┘ !\n",
        "    !=========================================================!\n",
        "    \n",
        "[I 210610 04:47:00 boot:345] Runner ID: dc258f8ec9a611ebb2f11691ffd6f87e\n",
        "    ===\n",
        "    return context.fizzbuzz(*[15], **{})\n",
        "    ===\n",
        "    \n",
        "[I 210610 04:47:01 boot:348] Result:\n",
        "    FizzBuzz\n"
    ]
}
>>>

① SDKRunnerインスタンスを生成します。

Note

既存のインスタンスを取得する場合は、以下のように引数でRunner識別子を指定してください。
runner = await SDKRunner.load(<Runner識別子>)
全ての既存インスタンスをリスト取得する場合は、以下のようにretrieveメソッドを使用してください。
runners = await SDKRunner.retrieve()

② SDKRunnerインスタンスにはFunctionプラグインが自動的にバインドされます。同期関数の場合もawaitを指定して呼び出してください。返り値にはRunner識別子が返却されます。

③ SDKRunnerの完了を待ち合わせます。KubernetesのJobがsucceededfailedに遷移するのを待ち合わせます。引数にtimeoutを指定可能です。デフォルトは180秒で設定されます。

④ SDKRunnerのPodログを取得します。RunnerのPodでエラーが発生した際は本ログで確認してください。

Note

Jobが終了した際、SDKは、Podの最終500行までのログを取得してJobとConfigMapを自動削除します。Jobのstatusとログはデータベースに格納され過去の情報を取得することもできます。

Warning

Qmonus Labから作成したSDKのプラグイン開発環境は、ユーザ環境単位にNamespaceが分離されますが、Nodeは共有リソースであり、一部のユーザが多量のコンテナを起動すると他の環境にパフォーマンス問題を引き起こす可能性があります。そのため、Namespaceには、デフォルトのResourceQuotaが設定されています。デフォルトの設定では、Pod数のhardリミットとしてSDKの必要Pod数+3が設定されており、JobのPodとしては3つまで並行起動ができます。増やしたい場合はサポートまでご相談ください。


デザインパターンについて

SDKRunnerを用いてワークロードのPod分割する場合、入出力結果をチェインする様々な方式が考えられます。ここでは、主なデザインパターンを紹介します。

Callback

トランザクション機能を有効化したシナリオでserveコマンドによるsuspend/resumeでワークフローを駆動するパターンです。serveコマンドの1つ前のコマンドでSDK Runnerを起動し、serveコマンドでコールバックを待ち受けます。serveコマンドには、deadlineタイマーを設定することでPodからのコールバックタイムアウトを設定しておくことを推奨します。コールバックに必要なトランザクション名やコールバック先のエンドポイント情報は、ConfigMap経由で伝搬できます。

インタフェース 実現手段
Runner起動トリガー シナリオのscriptコマンド
Runner完了待機 シナリオのserveコマンド
Runner起動パラメータ伝搬 ConfigMap or Secret
Runner完了トリガー Runnerポッドからのcallout発行
Runner処理結果伝搬 Runnerポッドからのcalloutbodyにセット


サンプル

Callbackパタンのサンプルを掲載します。以下は、SDKRunnerで動作させるFunctionプラグインです。引数で渡されたリクエストパスにキーワード引数をcallbackするだけのものです。

category: example
name: callbackRunner
code: |-
  async def callbackRunner(path, **kwargs):
      r = await callout(path=path, method=POST, body=kwargs)
      print(f"{r.code} {r.body}")


以下は、SDK側で動作させるScenarioプラグインです。SDKRunnerをforkしたら、suspendしてcallbackを待ち受けます。callbackを受信したら次のステップに進行して再度suspendしてcallbackを待ち受けていきます。

- category: example
  name: callbackPattern
  uri: /v1/callbackPatterns
  method: GET
  request_timeout: 60
  connect_timeout: 60
  additional_paths: []
  routing_auto_generation_mode: true
  routing_options:
    scope: local
  transaction:
    enable: true
    async: true
    xname: ''
  global_variables: {}
  variable_groups: []
  spec:
    response:
      normal:
        codes:
          - 200
  commands:
    - command: script
      kwargs:
        code: |-
          runner = await SDKRunner.load()
          await runner.callbackRunner("/v1/callbackPatterns/phase1", transactionName=context.qmonus.xname)
    - command: serve
      kwargs:
        path: /v1/callbackPatterns/phase1
        method: POST
        xname_key: transactionName
        aspect_options:
          post:
            process: |-
              runner = await SDKRunner.load()
              await runner.callbackRunner("/v1/callbackPatterns/phase2", transactionName=context.qmonus.xname)
    - command: serve
      kwargs:
        path: /v1/callbackPatterns/phase2
        method: POST
        xname_key: transactionName
        aspect_options:
          post:
            process: |-
              runner = await SDKRunner.load()
              await runner.callbackRunner("/v1/callbackPatterns/phase3", transactionName=context.qmonus.xname)
    - command: serve
      kwargs:
        path: /v1/callbackPatterns/phase3
        method: POST
        xname_key: transactionName


最後はRoutingプラグインです。これはSDKRunnerからAPIGWにcallbackするためのプロキシ設定となります。

- domain: default
  scope: secure
  proxy:
    scheme: 'http:'
    path: '/v1/callbackPatterns/{phase}'
  target:
    scheme: 'http:'
    path: '/v1/callbackPatterns/{phase}'
  authorities:
    - 'scenario:9000'
  connect_timeout: 60
  request_timeout: 60


動作確認

サンプルの動作確認はScenarioの実行画面で確認してください。


Event Chain

Runnerポッドでlambdaイベントを発行してチェーンしていくパターンです。

インタフェース 実現手段
Runner起動トリガー シナリオのscriptコマンドやルーティングのforbidden_process
Runner完了待機 -
Runner起動パラメータ伝搬 ConfigMap or Secret
Runner完了トリガー Runnerポッドからのlambda_event発行
Runner処理結果伝搬 Runnerポッドからのlambda_eventcontentにセット


サンプル

Event Chainパタンのサンプルを掲載します。以下は、SDKRunnerで動作させるFunctionプラグインです。lambdaイベントを発行するだけの処理を行います。

category: example
name: eventChainRunner
code: |-
  async def eventChainRunner(ttl=3):
      qprint(f"Start Runner: {options.runner_id} ttl={ttl}")
      eventId = await lambda_event("runnerCompleted", dict(ttl=ttl))


以下は、SDK側で動作するLambdaFunctionプラグインです。ttlを減算して1以上であれば再度SDKRunnerを起動します。

name: runnerCompleted
event: runnerCompleted
script: |-
  async def runnerCompleted(event, handle):
      event["ttl"]-=1
      qprint(f"Event receive: ttl={event['ttl']}")
      if event["ttl"]:
          runner = await SDKRunner.load()
          return await runner.eventChainRunner(event["ttl"])
      qprint("done.")


動作確認

Event Chainはトレースがしにくいですが、qprintで出力するようにしてdebugモードで実行することで処理が追跡しやすくなります。

>>> debug()↵
Disconnected the currently subscribed channel and connected to the debug channel ['xaas.hotspot.channel']
debug channel connected
>>> runner = await SDKRunner.load()↵
... await runner.eventChainRunner()↵
... ↵
↵
Start Runner: sdk-runner-6affd3eacfd311eb9f2dacde48001122 ttl=3
Event receive: ttl=2
Start Runner: sdk-runner-99aae9f0cfd311eb9f2dacde48001122 ttl=2
Event receive: ttl=1
Start Runner: sdk-runner-ca4f13a6cfd311eb9f2dacde48001122 ttl=1
Event receive: ttl=0
done.
>>> debug(False)↵
debug channel disconnected
>>>


Event Hook

Runnerポッドで結果のフィードバックを行わず、Job実行監視タスクが自動発行するタスクイベントをフックするJobプラグインでチェーンするパターンです。
Runnerポッドの異常によってイベント発行できないケースを想定したリカバリ手段を別途検討する必要があります。

インタフェース 実現手段
Runner起動トリガー シナリオのscriptコマンドやルーティングのforbidden_process
Runner完了待機 Job実行監視タスク
Runner起動パラメータ伝搬 ConfigMap or Secret
Runner完了トリガー Job実行監視タスクからの自動イベント発行
Runner処理結果伝搬 Cloud SQLなどのデータストレージ


Tip

SDK Runnerは、Job実行監視タスクによって自動で完了を検出します。Job実行監視タスクは、Kubernetes Jobの状態変化を検出すると以下のtopicのタスクイベントを自動発行します。以下のtopicをlistenするJobプラグインを設定しておくことでイベントフックパターンを実現できます。
SDKRunnerの状態がrunningに遷移した時: sdk.runner.running
SDKRunnerの状態がsucceededに遷移した時: sdk.runner.succeeded
SDKRunnerの状態がfailedに遷移した時: sdk.runner.failed
尚、SDKRunnerの生成時刻から10分を経過してもJobが稼働しない場合、強制削除扱いとなります。その場合は、 sdk.runner.forceDeletedイベントが発行されます。
※ jobプラグインのtask.contentには、task.content.runner_idtask.content.statusのみが設定されます。


サンプル

Event Hookパタンのサンプルを掲載します。以下は、SDKRunnerで動作させるFunctionプラグインです。乱数で成功か失敗を判定するだけの処理を行います。

category: example
name: eventHookRunner
code: |-
  async def eventHookRunner():
      import random
      result = random.randint(0, 1)
      qprint(f"Runner={options.runner_id} Result={result}")
      await asyncio.sleep(3)
      if result:
          raise Error(500)


次は、sdk.runner.succeededイベントをハンドリングするJobプラグインです。

- name: sdkRunnerSucceeded
  topics:
    - sdk.runner.succeeded
  script: qprint("Finished because the runner was successful")


最後は、sdk.runner.failedイベントをハンドリングするJobプラグインです。再度Runnerを実行します。

- name: sdkRunnerFailed
  topics:
    - sdk.runner.failed
  script: |-
    qprint("Retry because the runner failed...")
    runner = await SDKRunner.load()
    await runner.eventHookRunner()


動作確認

Event Hookはトレースがしにくいですが、qprintで出力するようにしてdebugモードで実行することで処理が追跡しやすくなります。

>>> debug()↵
Disconnected the currently subscribed channel and connected to the debug channel ['xaas.southbound.channel']
debug channel connected
>>> runner = await SDKRunner.load()↵
... await runner.eventHookRunner()↵
... ↵
↵
Runner=sdk-runner-98fe5206cfda11eb9bf9acde48001122 Result=1
Retry because the runner failed...
Runner=sdk-runner-cc0088d6cfda11eb9bf9acde48001122 Result=1
Retry because the runner failed...
Runner=sdk-runner-01ec2626cfdb11eb9bf9acde48001122 Result=1
Retry because the runner failed...
Runner=sdk-runner-37e4cf30cfdb11eb9bf9acde48001122 Result=1
Retry because the runner failed...
Runner=sdk-runner-6df1bcb4cfdb11eb9bf9acde48001122 Result=0
Finished because the runner was successful
>>> debug(False)↵
debug channel disconnected
>>>


Periodic Polling

Runnerポッドが永続化した結果を、定周期のDaemonプラグインでデータベースを監視して検出するパターンです。単純にデータベースをポーリングするだけなのでサンプルは掲載しません。

インタフェース 実現手段
Runner起動トリガー シナリオのscriptコマンドやルーティングのforbidden_process
Runner完了待機 DaemonによるDB監視
Runner起動パラメータ伝搬 ConfigMap or Secret
Runner完了トリガー -
Runner処理結果伝搬 Cloud SQLなどのデータストレージ


Queue-Worker

RunnerポッドでFIFOmFIFOに結果をキューイングしてWorkerプラグインで刈り取るパターンです。
Runnerポッドの異常によってキューイングできないケースを想定したリカバリ手段を別途検討する必要があります。Runner側ではキューイングするだけですのでDocs » Collector/Reflector » Workerなどを参考にしてください。

インタフェース 実現手段
Runner起動トリガー シナリオのscriptコマンドやルーティングのforbidden_process
Runner完了待機 Workerによるキューコンシューム
Runner起動パラメータ伝搬 ConfigMap or Secret
Runner完了トリガー RunnerポッドからのFIFOmFIFOなどへのキューイング
Runner処理結果伝搬 Runnerポッドからのキューイング情報にセット


持ち込みアプリケーション

持ち込みアプリケーションを駆動させたい場合、SDKRunnerとして駆動させるか、ご自身でKubernetesリソースを管理するプラグインを作成するかの2通りの方法があります。前者の場合は、マニフェストは、プラグイン開発者が作成し、その実行管理はSDKに委譲することができます。後者の場合は、APPENDIX>>Kubernetes連携に記載されているKube組込オブジェクトを利用してマニフェスト作成、リソースの状態判定、リソース削除の全てをプラグイン開発者自身で作成する必要があります。

Warning

持ち込みアプリケーションをプライベートのイメージレジストリからpullする場合は、ImagePullSecretsを設定しますが、セキュリティに配慮するため、必ずサポートまでご相談ください。

持ち込みアプリケーションをSDKRunnerで管理する方法

持ち込みアプリケーションのマニフェスト定義yamlもしくはマニフェストオブジェクトリストをSDKRunner組込オブジェクトのguestメソッドに渡すことで実行できます。

以下のFunctionプラグインは、busyboxイメージで単純なpingを打ち結果をファイル出力したら起動元のSDKのAPIGWにファイルをPOSTします。その後、APIGWにPOSTされたファイルをScenarioサーバで読み取り標準出力に表示するサンプルです。

category: example
name: pinger
code: |-
  async def pinger(target="localhost", count=10):
      import os
      from axis.boot.context import SystemContext
      headers = SystemContext.getinstance().authentication_headers()
      (key, token) = (headers["X-Xaas-Family-Key"], headers["X-Xaas-Auth-Token"])

      NAMESPACE = os.environ["QMONUS_JOB_RUNNER_NAMESPACE"]
      container = k8s.V1Container(name="pinger",
                                  image="yauritux/busybox-curl",
                                  command=["/bin/sh", "-c", f"ping -c {count} {target} | tee -i pinger.log;curl -H 'X-Xaas-Family-Key:{key}' -H 'X-Xaas-Auth-Token:{token}' -T pinger.log http://{options.default_apigw_host}:{options.default_apigw_port}/upload/pinger.log"])
      template = k8s.V1PodTemplateSpec(spec=k8s.V1PodSpec(restart_policy="Never", containers=[container]))
      spec = k8s.V1JobSpec(template=template, backoff_limit=0)
      job = k8s.V1Job(api_version="batch/v1", kind="Job", metadata=k8s.V1ObjectMeta(name="pinger", namespace=NAMESPACE), spec=spec)

      runner = await SDKRunner.load()
      r = await runner.guest(manifests=[job])
      print(r)
      r = await runner.waitfor()
      print(r)
      logs = await runner.logs()
      print(logs)
      print("Pinger Done.")

      def streaming_callback(chunk):
          print(chunk.decode("utf-8"))

      await callout(path="/download/pinger.log", streaming_callback=streaming_callback)


APIGWからストリーミングでファイルをダウンロードするため以下のルーティングを登録しておいてください。

- scope: secure
  domain: default
  proxy:
    scheme: 'http:'
    path: '/download/{filename}'
    authorization:
      auth_mode: qmonus
  target:
    scheme: 'http:'
    path: '/download/{filename}'
  authorities:
    - 0.0.0.0
  request_forbidden_process: |-
    import aiofiles
    import os
    if not resources.get("filename", None):
        raise HTTPError(400, reason="Unspecified filename")

    session.set_header("Content-Type", "application/octet-stream")

    filepath = os.path.join(os.getcwd(), "upload", resources["filename"])
    if not os.path.exists(filepath):
        raise HTTPError(404, reason="File Not found %r" % resources["filename"])

    async with aiofiles.open(filepath,"rb") as f:
        while True:
            data = await f.read(1024)
            if not data:
                break
            session.write(data)
            session.flush()
        session.finish()
  connect_timeout: 60
  request_timeout: 60


実行すると以下のようにSDKRunnerによって持ち込みのマニフェストも管理されることが確認できます。

>>> await context.pinger()↵
... ↵
sdk-guest-runner-21498bb8cf1211ebbe229a6daf065b2a
succeeded
{
    "pinger-zq7kl.pinger": [
        "PING localhost (127.0.0.1): 56 data bytes\n",
        "64 bytes from 127.0.0.1: seq=0 ttl=64 time=0.040 ms\n",
        "64 bytes from 127.0.0.1: seq=1 ttl=64 time=0.041 ms\n",
        "64 bytes from 127.0.0.1: seq=2 ttl=64 time=0.049 ms\n",
        "64 bytes from 127.0.0.1: seq=3 ttl=64 time=0.051 ms\n",
        "64 bytes from 127.0.0.1: seq=4 ttl=64 time=0.047 ms\n",
        "64 bytes from 127.0.0.1: seq=5 ttl=64 time=0.036 ms\n",
        "64 bytes from 127.0.0.1: seq=6 ttl=64 time=0.055 ms\n",
        "64 bytes from 127.0.0.1: seq=7 ttl=64 time=0.053 ms\n",
        "64 bytes from 127.0.0.1: seq=8 ttl=64 time=0.045 ms\n",
        "64 bytes from 127.0.0.1: seq=9 ttl=64 time=0.041 ms\n",
        "\n",
        "--- localhost ping statistics ---\n",
        "10 packets transmitted, 10 packets received, 0% packet loss\n",
        "round-trip min/avg/max = 0.036/0.045/0.055 ms\n",
        "  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current\n",
        "                                 Dload  Upload   Total   Spent    Left  Speed\n",
        "\r  0     0    0     0    0     0      0      0 --:--:-- --:--:-- --:--:--     0\r100   820  100   117  100   703   5191  31195 --:--:-- --:--:-- --:--:-- 70300\n",
        "{\"address\": \"127.0.0.1\", \"port\": 9000, \"filepath\": \"/root/axis-stable/upload/pinger.log\", \"mime\": null, \"bytes\": 703}"
    ]
}
Pinger Done.
PING localhost (127.0.0.1): 56 data bytes
64 bytes from 127.0.0.1: seq=0 ttl=64 time=0.040 ms
64 bytes from 127.0.0.1: seq=1 ttl=64 time=0.041 ms
64 bytes from 127.0.0.1: seq=2 ttl=64 time=0.049 ms
64 bytes from 127.0.0.1: seq=3 ttl=64 time=0.051 ms
64 bytes from 127.0.0.1: seq=4 ttl=64 time=0.047 ms
64 bytes from 127.0.0.1: seq=5 ttl=64 time=0.036 ms
64 bytes from 127.0.0.1: seq=6 ttl=64 time=0.055 ms
64 bytes from 127.0.0.1: seq=7 ttl=64 time=0.053 ms
64 bytes from 127.0.0.1: seq=8 ttl=64 time=0.045 ms
64 bytes from 127.0.0.1: seq=9 ttl=64 time=0.041 ms

--- localhost ping statistics ---
10 packets transmitted, 10 packets received, 0% packet loss
round-trip min/avg/max = 0.036/0.045/0.055 ms
↵
>>>


Note

SDKRunnerのguestインスタンスメソッドは、以下のように宣言されています。
async def guest(self, manifests=[], cluster=None, sync=False)
リモートのKubernetesクラスターに適用する場合は、clusterを指定してください。sync=Trueとした場合は、waitforを自動で呼び出すため、Jobの完了まで応答しません。

Tip

Kubernetes Job状態はSDKで看取りますが、KubernetesのTTL controllerが有効化され、Job Specの.spec.ttlSecondsAfterFinishedが設定されている場合は、自動でJob/Podが削除されるため、状況によっては最終状態を看取ることができません。.spec.ttlSecondsAfterFinishedは設定しないようにしてください。


持ち込みアプリケーションを自身で管理する方法

ここではサンプルアプリケーションとして、少し複雑なアプリケーションを題材としたいため、Qmonus SDK自体を持ち込みアプリケーションと仮定します。
Qmonus SDKをRunnerモードで起動することとし、QmonusスクリプトをConfigMapにInjectし、JobのContainerセクションでConfigMapのマウントとRunnerモードの起動パラメータを指定してマニフェストを組み立てます。マニフェストを適用したら、Jobのステータスを看取り成功したらJob/ConfigMapを削除します。失敗時はログ解析のため、リソースは残すという仕様で作成してみましょう。

Functionプラグインを作成する

ConfigMapとJobを生成するSDKプラグインをFunctionプラグインとして作成します。(特に制約はないのでAPIGWのルーティングプラグインやScenario、ATOMなどに記述することもできます)

Tip

Kubernetes Jobを作成するために、Namespaceとイメージが必要です。これらは、Qmonus SDKの場合は、Qmonus LabやValue StreamからデプロイされていればSDKのPodの環境変数から取得することができます。
Namespaceは、環境変数QMONUS_JOB_RUNNER_NAMESPACEに設定されています。SDKイメージ名は、環境変数QMONUS_SDK_IMAGE_NAMEに設定されています。

インクラスターkubeオブジェクトについて

通常は、Appendix>>Kubernetes連携に記載されている手順でリモートのkubernetesクラスタを作成して操作しますが、SDKのNamespaceを操作する場合はインクラスターKubeオブジェクトを取得すれば操作できます。インクラスターKubeオブジェクトは、Frontalからクラスター登録をする必要がなく、k = await Kube.load()で取得できます。


Functionプラグインは以下のように定義してください。SDK Runnerの起動パラメータは、Runner側でAPIGW、Transaction、Scenarioを有効化していますが、Runnerから外部にcalloutするようなケースにおいて起動元のSDKが所属するAPIGWを利用したい場合は、--apigw=Falseとして--default_apigw_host=apigwと指定する必要があります。

category: example
name: avator
code: |-
  async def avator(name, script):
      import os
      k = await Kube.load()

      # 環境変数QMONUS_JOB_RUNNER_NAMESPACEとQMONUS_SDK_IMAGE_NAMEはデフォルトで設定されています
      NAMESPACE = os.environ["QMONUS_JOB_RUNNER_NAMESPACE"]
      IMAGE = os.environ["QMONUS_SDK_IMAGE_NAME"]

      # Runnerで実行するスクリプトをConfigMapにInjectする
      configmap = k8s.V1ConfigMap(api_version="v1",
                                  kind="ConfigMap",
                                  data={name: script},
                                  metadata=k8s.V1ObjectMeta(name=f"{name}-configmap", namespace=NAMESPACE))
      r = await k.getNamespace(NAMESPACE).createConfigMap(configmap)
      print(f"ConfigMap:{name}-configmap created")

      # ConfigMapをマウントしてRunnerをJobで起動する
      container = k8s.V1Container(name="qmonus-runner",
                                  image=IMAGE,
                                  image_pull_policy="Always",
                                  args=[f"--run_script=/root/config/{name}",
                                        f"--apigw=True",
                                        f"--transaction=True",
                                        f"--scenario=True",
                                        f"--db_host={options.db_host}",
                                        f"--redis_local_host={options.redis_local_host}"],
                                  volume_mounts=[k8s.V1VolumeMount(mount_path="/root/config", name="config-volume")])

      template = k8s.V1PodTemplateSpec(spec=k8s.V1PodSpec(restart_policy="Never",
                                                          containers=[container],
                                                          volumes=[k8s.V1Volume(name="config-volume",
                                                                                config_map=k8s.V1ConfigMapVolumeSource(name=f"{name}-configmap"))]))

      spec = k8s.V1JobSpec(template=template, backoff_limit=0)
      job = k8s.V1Job(api_version="batch/v1", kind="Job", metadata=k8s.V1ObjectMeta(name=name, namespace=NAMESPACE), spec=spec)

      r = await k.getNamespace(NAMESPACE).createJob(job)
      status = None
      while True:
          r = await k.getNamespace(NAMESPACE).listJob(name)
          if not status or status != r.status.to_dict():
              status = r.status.to_dict()
              print(f"Job:{name}\n{json.dumps(status, indent=4)}")
          if r.status.succeeded or r.status.failed:
              print(f"Job:{name} Done.\n{r.status}")
              break
          await asyncio.sleep(3)

      if status["succeeded"]:
          await k.getNamespace(NAMESPACE).deleteJob(name)
          print(f"Job:{name} deleted")
          await k.getNamespace(NAMESPACE).deleteConfigMap(f"{name}-configmap")
          print(f"ConfigMap:{name}-configmap deleted")
      elif status["failed"]:
          print(f"Job execution error occurred")


動作確認

Functionプラグインを作成したので実行します。Functionプラグインavatorは、Jobの名前とQmonusスクリプトのテキストを引数で渡すことで動作します。
ここでは、チュートリアル>>Practical Test-Driven Developmentで作成したテストケースをRunner Podで実行させます。テスト実行自体は、Runner Podで実行されますが、Runner PodはSDKと同じデータベースやメモリストアに接続されているため、テストの進捗は、SDK側のポータルで確認することができます。
RunnerのPodログは、FrontalのGUIから確認することができます。通常のSDKプラグインとは実行空間が異なるため、Runnerで動作させるスクリプトはSDKのPodで十分に動作確認してからJobでテストすることをお勧めします。

>>> await context.avator("test", "await Test.runall()")↵
... ↵
ConfigMap:test-configmap created
Job:test
{
    "active": null,
    "completion_time": null,
    "conditions": null,
    "failed": null,
    "start_time": null,
    "succeeded": null
}
Job:test
{
    "active": 1,
    "completion_time": null,
    "conditions": null,
    "failed": null,
    "start_time": "2021-06-04T01:52:25+00:00",
    "succeeded": null
}
Job:test
{
    "active": null,
    "completion_time": "2021-06-04T01:56:00+00:00",
    "conditions": [
        {
            "last_probe_time": "2021-06-04T01:56:00+00:00",
            "last_transition_time": "2021-06-04T01:56:00+00:00",
            "message": null,
            "reason": null,
            "status": "True",
            "type": "Complete"
        }
    ],
    "failed": null,
    "start_time": "2021-06-04T01:52:25+00:00",
    "succeeded": 1
}
Job:test Done.
{'active': None,
 'completion_time': datetime.datetime(2021, 6, 4, 1, 56, tzinfo=tzlocal()),
 'conditions': [{'last_probe_time': datetime.datetime(2021, 6, 4, 1, 56, tzinfo=tzlocal()),
                 'last_transition_time': datetime.datetime(2021, 6, 4, 1, 56, tzinfo=tzlocal()),
                 'message': None,
                 'reason': None,
                 'status': 'True',
                 'type': 'Complete'}],
 'failed': None,
 'start_time': datetime.datetime(2021, 6, 4, 1, 52, 25, tzinfo=tzlocal()),
 'succeeded': 1}
Job:test deleted
↵
ConfigMap:test-configmap deleted
>>>