ファイルをアップロードする方法はありますか?



Qmonus SDKでは、ファイルアップロード専用APIとしてPUT /upload/{filepath}というエンドポイントを提供しています。
以下は、MacOSで2Gのダミーファイルを作成してcurlコマンドでQmonusにファイルアップロードする例です。

mkfile 2g dummy.bin
curl -T dummy.bin {endpoint}/upload/dummy.bin

Note

endpointは、Qmonus SDKが稼動しているマシンのエンドポイントを指定してください。


アップロード処理の前後に自動発行されるイベントについて

本アップロードAPIは、アップロード処理を開始する前と完了した後のそれぞれのタイミングでLambdaイベントを発行します。

APIGW.OnFileUploading イベント
アップロード処理を開始する前にAPIGW.OnFileUploadingイベントが発行されます。このイベントは、アップロードAPIの認可を独自拡張することを可能にするために発行されます。対応するLambdaイベントハンドラが登録されていない場合は、デフォルトのQmonus認可が適用されます。認可処理を独自拡張する場合は、対応するLambdaイベントハンドラを作成してください。
lambdaイベントに渡されるevent変数には、以下の情報が含まれています。

  • path: Qmonus SDKの起動ディレクトリからの相対ファイルパスが格納されています。上書きすることでAPIGW上に保存されるファイルパスをカスタマイズできます。
  • resources: リクエストパスに含まれる変数辞書が格納されています。
  • params: リクエストクエリ辞書オブジェクトが格納されています。
  • headers: リクエストヘッダの辞書オブジェクトが格納されています。

APIGW.OnFileUploaded イベント
アップロード処理を完了した後にAPIGW.OnFileUploadedイベントが発行されます。このイベントは、アップロードされたファイルを事後処理するトリガーとして発行されます。
アップロードされたファイルは、APIGWのローカルディスクに保存されます。
lambdaイベントに渡されるevent変数には、以下の情報が含まれています。

  • address: アップロードを処理したAPIGWのホストアドレスが格納されています。
  • port: アップロードを処理したAPIGWのポート番号が格納されています。
  • filepath: アップロードされたファイルの絶対ファイルパスが格納されています。
  • mime: アップロードされたファイルのContent-Typeが格納されています。
  • bytes: アップロードされたファイルのバイト数が格納されています。
  • path: Qmonus SDKの起動ディレクトリからの相対ファイルパスが格納されています。APIGW.OnFileUploadingイベントハンドラで上書きした場合は、反映されます。
  • resources: リクエストパスに含まれる変数辞書が格納されています。
  • params: リクエストクエリ辞書オブジェクトが格納されています。
  • headers: リクエストヘッダの辞書オブジェクトが格納されています。


チュートリアル

アップロードしたファイルをさらにAmazon S3にアップロードする単純なアプリケーションのサンプルを以下に記載します。 作成するプラグインは、アップロード処理前後のLambdaイベントハンドラとAPIGW上でファイルをS3に転送するルーティングです。



APIGW.OnFileUploadingを処理するLambdaイベントハンドラ
APIGW.OnFileUploadingイベントを処理するハンドラでは、event変数に格納されているリクエストヘッダ情報を認可してください。
認可エラーを返却する場合は、await handle.terminate(code=401, output="Unauthorized")のようにterminate関数を呼び出す必要があります。
以下のプラグイン例の最終行にある認可OKの場合のterminateは省略することもできます。

name: fileUploadingEventHandler
event: APIGW.OnFileUploading
script: |-
  async def fileUploadingEventHandler(event, handle):
      token = event.get("X-Auth-Token", None)
      if not token:
          return await handle.terminate(code=401, output="Unauthorized")

      """認可処理を記載してください
      認可したアカウント単位でディレクトリを作ってファイルを格納したい場合などは以下のようにevent変数のpathを上書きしてください。
      event["path"] = "/accountA"+event["path"]

      `PUT /upload/centos63.qcow2?imageID=d07831df-edc3-4817-9881-89141f9134c3`などのようにクエリを付与している場合は、
      event["params"]に{'imageId': ['d07831df-edc3-4817-9881-89141f9134c3']}のように格納されています。
      """
      await handle.terminate(code=200, output="Success")


APIGW.OnFileUploadedを処理するLambdaイベントハンドラ
APIGW.OnFileUploadedイベントを処理するハンドラでは、event変数に格納されているアップロード処理を実行したAPIGWのエンドポイントとファイルパスを明示的に指定してcalloutします。

name: fileUploadedEventHandler
event: APIGW.OnFileUploaded
script: |-
  async def fileUploadedEventHandler(event, handle):
      config = await get_service_config("handson", mu_conversion=True)
      await callout(endpoint="%s:%d" % (event["address"], event["port"]),
                    path="/v1/uploadToS3",
                    method=PUT,
                    body=dict(aws_access_key_id=config.aws_access_key_id,
                              aws_secret_access_key=config.aws_secret_access_key,
                              bucket="qmonus",
                              filepath=event["filepath"],
                              folder="uploadedFiles",
                              region=config.region))


Tip

lambdaイベントハンドラのプラグインファイルに上記のような鍵情報が含まれる場合は、秘匿化されるConfigプラグインに定義して参照する設計を推奨します。


Lambdaイベントハンドラから呼び出されるAPIGWのrouting定義

Lambdaイベントハンドラから呼び出されたら、ローカルディスクにある指定されたファイルをAmazon S3に保存するスクリプトをforbidden_processで記述しています。

- authorities:
    - 0.0.0.0
  proxy:
    authorization:
      auth_mode: axis
    path: /v1/uploadToS3
    scheme: 'http:'
  target:
    path: /v1/uploadToS3
    scheme: 'http:'
  request_forbidden_process: |-
    import os
    import aiobotocore
    if request.method != PUT:
        raise HTTPError(405)

    payload = MU(json.loads(request.body))
    key = "{}/{}".format(payload.folder, os.path.basename(payload.filepath[1:]))

    session = aiobotocore.get_session()
    async with session.create_client("s3", region_name=payload.region,
                                           aws_secret_access_key=payload.aws_secret_access_key,
                                           aws_access_key_id=payload.aws_access_key_id) as client:
        response = await client.list_buckets()
        for bucket in response["Buckets"]:
            if bucket["Name"] == payload.bucket:
                break
        else:
            logger.info("Creating %r S3 bucket..." % payload.bucket)
            await client.create_bucket(Bucket=payload.bucket, CreateBucketConfiguration=dict(LocationConstraint=payload.region))
            logger.info("Done.")

        with open(payload.filepath, "rb") as f:
            logger.info("Uploading...")
            start = time.time()
            resp = await client.put_object(Bucket=payload.bucket, Key=key, Body=f)
            elapsed = time.time()-start
            logger.info(f"Done. {elapsed:0.4} sec")

        os.remove(payload.filepath)
        return dict(elapsed=round(elapsed, 2))
  spec:
    - command: callout
      method: PUT
      schema:
        request:
          body:
            properties:
              aws_access_key_id:
                type: string
              aws_secret_access_key:
                type: string
              bucket:
                type: string
              filepath:
                type: string
              folder:
                type: string
              region:
                type: string
            required:
              - aws_access_key_id
              - aws_secret_access_key
              - region
              - bucket
              - folder
              - filepath
            type: object
          headers:
            properties:
              Content-Type:
                default: application/json
                type: string
            required:
              - Content-Type
            type: object
        response:
          normal:
            codes:
              - 200
      usage: uploadToS3

Warning

本サンプルを実行するには、aiobotocoreのインストールが必要です。

Tip

上記は、aiobotocoreを利用してファイルアップロードしていますが、自前でファイルストリームをアップロードする処理を記述する場合は注意が必要です。 Qmonus SDKのcalloutは、body_producerによるストリーム型のbodyに対応していません。
従って、標準でバンドルインストールされているaiohttpaiofilesを利用してください。以下はそれらを利用したforbidden_processの記述例です。

import os
import aiofiles
payload = MU(json.loads(request.body))

async with aiohttp.ClientSession() as session:
    async def body_producer(file_name=None):
        async with aiofiles.open(file_name, "rb") as f:
            chunk = await f.read(64*1024)
            while chunk:
                yield chunk
                chunk = await f.read(64*1024)

    url = ...ターゲットのURL
    headers = ...ヘッダ作成

    async with session.put(url, headers=headers, data=body_producer(file_name=payload.filepath)) as resp:
        return 200, await resp.text()


Tip

APIGWのローカルディスクにファイルを保存した場合の例を紹介しましたが、APIGWとLambdaで共有ディスクをマウントしている場合は、APIGW.OnFileUploadedのイベントハンドラの中でファイル転送することができます。


ファイルアップロードの待ち受けパスをカスタマイズする方法

前述した通り、ファイルアップロードAPIのデフォルト待受パスは、/upload/{filepath}ですが、起動パラメータ--file_upload_api_pathを指定することでカスタマイズすることができます。
例として以下のような起動パラメータを設定した場合、APIGW.OnFileUploadingイベントのpathは、/upload/{filename}となります。また{tenant_id}変数は、resources変数で参照できます。

--file_upload_api_path=/v1/tenants/{tenant_id}/images/{filename}/file

Warning

カスタムパスには、必ず{filename}を含むようにしてください。Qmonus SDKは、{filename}の位置に指定された文字列をファイル名と認識します。カスタムパスに{filename}が含まれていない場合は、uuidがファイル名に付与されます。