Lambda Function

Lambda Functionサービスは、イベントチェインによる連鎖的な処理モデルを提供する機能です。Lambda Functionは、イベントによってトリガーされますが、処理の中でイベントを発行していくとそのチェインを管理することができます。

Note

タスクイベントによってトリガーされ、任意の処理を実行できるJobサービスと似て非なる機能です。Jobの処理の中で別のタスクを発行できますが、その発行元となったタスクと発行先のタスクはなんら関係を持たず独立しています。また、Jobとは異なり、Lambda Functionでは、イベントチェインが管理されているため、最終的な処理結果を受け取ることができます。

Job連鎖 Lambda Function連鎖


Lambda Functionの定義項目

  • event: Lambda Functionにディスパッチするイベント名を指定します。指定したイベント名のイベントが検出された場合にLambda Functionが発動します。


  • script: Lambda Functionが実行するカスタムスクリプトを実装します。カスタムスクリプトには1つの非同期関数のみ定義してください。関数名は自由に定義してください。引数はeventhandleの2つの変数が与えられます。eventには、イベント発行時に与えられる任意の辞書オブジェクトが格納されます。handleには、イベントチェインを操作する関数が格納されます。
# 関数定義例
async def helloLambda(event, handle):
    pass


  • success: Lambda Functionのカスタムスクリプトが正常終了した場合に連鎖発行するイベントを指定します。本項目はオプショナルです。


  • failure: Lambda Functionのカスタムスクリプトが異常終了(例外をスロー)した場合に連鎖発行するイベントを指定します。本項目はオプショナルです。


チュートリアル

はじめてのLambda Function

まずは簡単なLambda Functionを体験してみましょう。最初のチュートリアルでは、連鎖しない単発のイベントハンドラを作成してみます。
以下の定義でLambda Functionを作成してください。
Lambda Functionは、非同期関数で定義します。また、引数にはeventhandleが与えられます。event変数には、イベント発行時に与えられる任意の辞書オブジェクトが格納されます。handleにはイベントチェインに対する操作関数が格納されています。また、動作確認をREPLで行うため、実行空間を超えて処理状況が把握できるようqprint組込み関数を使用してデバッグチャネルに出力します。

event: hello
name: helloLambda
script: |-
  async def helloLambda(event, handle):
     qprint(event)
     await handle.terminate(output="Processed")


Tip

handleは、lamdbaイベントチェインに対する操作を行うコンテキストです。handle.terminateは出力結果を返却したい場合に利用します。次のチュートリアルで登場するhandle.chainは、連鎖イベントを発行する関数です。連鎖イベントではなく新規のイベントを発行して別の連鎖ツリーを生成する場合は、lambda_event組込み関数を使用してください。尚、handle.chainlambda_eventの引数は同じでevent名event情報を指定してください。

Warning

Lambda Functionのスクリプトは自由に記述できますが、デフォルト引数で与えられるhandleの変数名をcontextと記述することもできます。この場合、chainterminateは使えますが、実行空間に存在する共通コンテキストcontextをオーバーライドしてしまうため、共通コンテキストに含まれるプラグインモジュールやプラグインファンクションが利用できなくなるため注意が必要です。

Lambda Functionを作成したら、早速動作を確認しましょう。最初にdebug()でデバッグチャネルに接続します。次にlambda_event組込み関数でイベントを発行し、取得したイベントIDをlambda_convergence組込み関数を使ってイベント連鎖の終端を待ち合わせ、結果を取得しています。

>>> debug()↵
debug channel connected
>>> eventId = await lambda_event("hello", dict(msg="Hello, Lambda!"))↵
... print(eventId)↵
... r = await lambda_convergence(eventId)↵
... print(r.yaml_format)↵
... ↵
e21dc39a3f6611eb850aacde48001122
{'msg': 'Hello, Lambda!'}
↵
chains: []
code: 200
content:
  msg: Hello, Lambda!
event: hello
expire: '2020-12-16T15:26:07.774821+09:00'
id: e21dc39a3f6611eb850aacde48001122
initiate: '2020-12-16T15:21:08.367045+09:00'
occurrence: '2020-12-16T15:21:07.774789+09:00'
output: Processed
terminate: '2020-12-16T15:21:08.369988+09:00'
>>>

Note

lambda_event及びlambda_convergence組込み関数の使用方法は、Docs » リファレンス » ビルトインオブジェクトを参照してください


イベント連鎖

次にイベント連鎖を体験してみましょう。事前準備として単純なATOMオブジェクトを作成します。以下の定義でATOMを作成してください。
このATOMオブジェクトは、最初のイベントでインスタンス化され、その後のイベントでATOMのメソッドを起動して状態を順次遷移させていくような使い方をします。

category: example
name: Phase
attributes:
  local_fields:
    - field_name: status
      field_type: string
      field_persistence: true
      field_nullable: true
      field_immutable: false
      field_unique: false
      field_fsm:
        phase1:
          execution_method: phase1
        phase2:
          execution_method: phase2
        phase3:
          execution_method: phase3
  ref_fields: []
  identifier:
    field_name: name
    field_type: string
    field_persistence: true
    field_immutable: true
methods:
  class_methods: []
  instance_methods:
    - method_body: |-
        async def phase1(self, *args, **kwargs):
            import sys
            qprint("%s.%s.%s" % (self.__class__.__name__, self.name, sys._getframe().f_code.co_name))
      propagation_mode: true
      topdown: true
      auto_rollback: true
      multiplexable_number: 1
      field_order: ascend
    - method_body: |-
        async def phase2(self, *args, **kwargs):
            import sys
            qprint("%s.%s.%s" % (self.__class__.__name__, self.name, sys._getframe().f_code.co_name))
      propagation_mode: true
      topdown: true
      auto_rollback: true
      multiplexable_number: 1
      field_order: ascend
    - method_body: |-
        async def phase3(self, *args, **kwargs):
            import sys
            qprint("%s.%s.%s" % (self.__class__.__name__, self.name, sys._getframe().f_code.co_name))
      propagation_mode: true
      topdown: true
      auto_rollback: true
      multiplexable_number: 1
      field_order: ascend
persistence: true
abstract: false
api_generation: false


ATOMを作成したら動作を確認しましょう。メソッドを実行するとメソッドと同名のstatusに遷移するだけの単純なオブジェクトです。

>>> p = atom.Phase(name="hoge")↵
... print(p.status)↵
... ↵
↵
None
>>> await p.phase1()↵
... print(p.status)↵
... await p.phase2()↵
... print(p.status)↵
... await p.phase3()↵
... print(p.status)↵
... ↵
Phase.hoge.phase1
phase1
Phase.hoge.phase2
phase2
Phase.hoge.phase3
phase3
>>>


では、本題のLambda Functionを作成しましょう。最初のLambda Functionは、phase0イベントに反応します。
イベントに与えられた引数を元に事前準備で作成したATOMのインスタンスを生成してデータベースに保存後、次に連鎖するphase1イベントを発行しています。

event: phase0
name: phase0
script: |-
  async def phase0(event, handle):
      phase = atom.Phase(**event)
      await phase.save()
      qprint("Created new:\n%s" % phase.yaml_format)
      await handle.chain("phase1", phase.dictionary)


2つ目のLambda Functionを作成します。これはphase1イベントに反応します。イベントに含まれるphase0イベントで生成されたATOMインスタンスの識別子を元にデータベースからATOMインスタンスをロードし、phase1メソッドを実行します。次に連鎖するphase2イベントを発行します。

event: phase1
name: phase1
script: |-
  async def phase1(event, handle):
      phase = await atom.Phase.load(event["instance"])
      await phase.phase1()
      await handle.chain("phase2", phase.dictionary)


3つ目のLambda Functionを作成します。これはphase2イベントに反応します。イベントに含まれるphase1イベントから引き継いだATOMインスタンスの識別子を元にデータベースからATOMインスタンスをロードし、phase2メソッドを実行します。次に連鎖するphase3イベントを発行します。

event: phase2
name: phase2
script: |-
  async def phase2(event, handle):
      phase = await atom.Phase.load(event["instance"])
      await phase.phase2()
      await handle.chain("phase3", phase.dictionary)


4つ目のLambda Functionを作成します。これはphase3イベントに反応します。イベントに含まれるphase2イベントから引き継いだATOMインスタンスの識別子を元にデータベースからATOMインスタンスをロードし、phase3メソッドを実行します。次にhandle.terminateに結果を出力した後、データベースからATOMインスタンスを削除します。

event: phase3
name: phase3
script: |-
  async def phase3(event, handle):
      phase = await atom.Phase.load(event["instance"])
      await phase.phase3()
      await handle.terminate(output=phase.dictionary)
      await phase.destroy()


以上でイベント連鎖に使うプラグインは完成したので早速イベントを発行して動作を確認しましょう。

>>> debug()↵
debug channel connected
>>> eventId = await lambda_event("phase0", dict(name="hoge"))↵
... r = await lambda_convergence(eventId)↵
... print(r.yaml_format)↵
... ↵
Created new:
Phase:
  instance: UGhhc2U6Y2NhMWZmZTgzZjYzMTFlYjg1MGFhY2RlNDgwMDExMjI=
  name: hoge
Phase.hoge.phase1
Phase.hoge.phase2
Phase.hoge.phase3
↵
chains:
- chains:
  - chains:
    - chains: []
      code: 200
      content:
        instance: UGhhc2U6Y2NhMWZmZTgzZjYzMTFlYjg1MGFhY2RlNDgwMDExMjI=
        name: hoge
        status: phase2
      event: phase3
      expire: '2020-12-16T15:04:05.290258+09:00'
      id: cddaa1ee3f6311eb850aacde48001122
      initiate: '2020-12-16T14:59:06.251735+09:00'
      occurrence: '2020-12-16T14:59:05.290222+09:00'
      output:
        instance: UGhhc2U6Y2NhMWZmZTgzZjYzMTFlYjg1MGFhY2RlNDgwMDExMjI=
        name: hoge
        status: phase3
      terminate: '2020-12-16T14:59:06.286125+09:00'
    code: 200
    content:
      instance: UGhhc2U6Y2NhMWZmZTgzZjYzMTFlYjg1MGFhY2RlNDgwMDExMjI=
      name: hoge
      status: phase1
    event: phase2
    expire: '2020-12-16T15:04:04.272636+09:00'
    id: cd3f5b4e3f6311eb850aacde48001122
    initiate: '2020-12-16T14:59:05.250330+09:00'
    occurrence: '2020-12-16T14:59:04.272603+09:00'
    output: null
    terminate: '2020-12-16T14:59:07.303715+09:00'
  code: 200
  content:
    instance: UGhhc2U6Y2NhMWZmZTgzZjYzMTFlYjg1MGFhY2RlNDgwMDExMjI=
    name: hoge
  event: phase1
  expire: '2020-12-16T15:04:03.249252+09:00'
  id: cca333fe3f6311eb850aacde48001122
  initiate: '2020-12-16T14:59:04.243668+09:00'
  occurrence: '2020-12-16T14:59:03.249226+09:00'
  output: null
  terminate: '2020-12-16T14:59:09.298983+09:00'
code: 200
content:
  name: hoge
event: phase0
expire: '2020-12-16T15:04:02.440918+09:00'
id: cc27dc043f6311eb850aacde48001122
initiate: '2020-12-16T14:59:03.240097+09:00'
occurrence: '2020-12-16T14:59:02.440886+09:00'
output: null
terminate: '2020-12-16T14:59:11.294415+09:00'
>>>

Tip

Lambdaのイベント発行は、lambda_event組込み関数で行ってきましたが、LambdaBooking組込みクラスを使用してイベント発行を予約することもできます。Docs » リファレンス » ビルトインオブジェクトLambdaBookingを参照ください。