Lambda Function
Lambda Function
サービスは、イベントチェインによる連鎖的な処理モデルを提供する機能です。Lambda Function
は、イベントによってトリガーされますが、処理の中でイベントを発行していくとそのチェインを管理することができます。
Note
タスクイベントによってトリガーされ、任意の処理を実行できるJob
サービスと似て非なる機能です。Job
の処理の中で別のタスクを発行できますが、その発行元となったタスクと発行先のタスクはなんら関係を持たず独立しています。また、Job
とは異なり、Lambda Function
では、イベントチェインが管理されているため、最終的な処理結果を受け取ることができます。
Job連鎖 | Lambda Function連鎖 |
---|---|
Lambda Functionの定義項目
属性 | 概要 | 備考 |
---|---|---|
event |
Lambda Functionにディスパッチするイベント名を指定します。 | 指定したイベント名のイベントが検出された場合にLambda Functionが発動します。 |
script |
Lambda Functionが実行するカスタムスクリプトを実装します。 | カスタムスクリプトには1つの非同期関数のみ定義してください。関数名は自由に定義してください。 引数は event とhandle の2つの変数が与えられます。event には、イベント発行時に与えられる任意の辞書オブジェクトが格納されます。handle には、イベントチェインを操作する関数が格納されます。 |
success |
Lambda Functionのカスタムスクリプトが正常終了した場合に連鎖発行するイベントを指定します。 | 本項目はオプショナルです。 |
failure |
Lambda Functionのカスタムスクリプトが異常終了(例外をスロー)した場合に連鎖発行するイベントを指定します。 | 本項目はオプショナルです。 |
# script(関数定義)の例
async def helloLambda(event, handle):
pass
チュートリアル
はじめてのLambda Function
まずは簡単なLambda Functionを体験してみましょう。最初のチュートリアルでは、連鎖しない単発のイベントハンドラを作成してみます。
以下の定義でLambda Functionを作成してください。
Lambda Functionは、非同期関数で定義します。また、引数にはevent
とhandle
が与えられます。event
変数には、イベント発行時に与えられる任意の辞書オブジェクトが格納されます。handle
にはイベントチェインに対する操作関数が格納されています。また、動作確認をREPLで行うため、実行空間を超えて処理状況が把握できるようqprint
組込み関数を使用してデバッグチャネルに出力します。
event: hello
name: helloLambda
script: |-
async def helloLambda(event, handle):
qprint(event)
await handle.terminate(output="Processed")
Tip
handle
は、lamdbaイベントチェインに対する操作を行うコンテキストです。handle.terminate
は出力結果を返却したい場合に利用します。次のチュートリアルで登場するhandle.chain
は、連鎖イベントを発行する関数です。連鎖イベントではなく新規のイベントを発行して別の連鎖ツリーを生成する場合は、lambda_event
組込み関数を使用してください。尚、handle.chain
とlambda_event
の引数は同じでevent名
とevent情報
を指定してください。
Warning
Lambda Functionのスクリプトは自由に記述できますが、デフォルト引数で与えられるhandle
の変数名をcontext
と記述することもできます。この場合、chain
やterminate
は使えますが、実行空間に存在する共通コンテキストcontext
をオーバーライドしてしまうため、共通コンテキストに含まれるプラグインモジュールやプラグインファンクションが利用できなくなるため注意が必要です。
Lambda Functionを作成したら、早速動作を確認しましょう。最初にdebug()
でデバッグチャネルに接続します。次にlambda_event
組込み関数でイベントを発行し、取得したイベントIDをlambda_convergence
組込み関数を使ってイベント連鎖の終端を待ち合わせ、結果を取得しています。
>>> debug()↵
debug channel connected
>>> eventId = await lambda_event("hello", dict(msg="Hello, Lambda!"))↵
... print(eventId)↵
... r = await lambda_convergence(eventId)↵
... print(r.yaml_format)↵
... ↵
e21dc39a3f6611eb850aacde48001122
{'msg': 'Hello, Lambda!'}
↵
chains: []
code: 200
content:
msg: Hello, Lambda!
event: hello
expire: '2020-12-16T15:26:07.774821+09:00'
id: e21dc39a3f6611eb850aacde48001122
initiate: '2020-12-16T15:21:08.367045+09:00'
occurrence: '2020-12-16T15:21:07.774789+09:00'
output: Processed
terminate: '2020-12-16T15:21:08.369988+09:00'
>>>
Note
lambda_event
及びlambda_convergence
組込み関数の使用方法は、Docs » リファレンス » ビルトインオブジェクト
を参照してください
イベント連鎖
次にイベント連鎖を体験してみましょう。事前準備として単純なATOMオブジェクトを作成します。以下の定義でATOMを作成してください。
このATOMオブジェクトは、最初のイベントでインスタンス化され、その後のイベントでATOMのメソッドを起動して状態を順次遷移させていくような使い方をします。
category: example
name: Phase
attributes:
local_fields:
- field_name: status
field_type: string
field_persistence: true
field_nullable: true
field_immutable: false
field_unique: false
field_fsm:
phase1:
execution_method: phase1
phase2:
execution_method: phase2
phase3:
execution_method: phase3
ref_fields: []
identifier:
field_name: name
field_type: string
field_persistence: true
field_immutable: true
methods:
class_methods: []
instance_methods:
- method_body: |-
async def phase1(self, *args, **kwargs):
import sys
qprint("%s.%s.%s" % (self.__class__.__name__, self.name, sys._getframe().f_code.co_name))
propagation_mode: true
topdown: true
auto_rollback: true
multiplexable_number: 1
field_order: ascend
- method_body: |-
async def phase2(self, *args, **kwargs):
import sys
qprint("%s.%s.%s" % (self.__class__.__name__, self.name, sys._getframe().f_code.co_name))
propagation_mode: true
topdown: true
auto_rollback: true
multiplexable_number: 1
field_order: ascend
- method_body: |-
async def phase3(self, *args, **kwargs):
import sys
qprint("%s.%s.%s" % (self.__class__.__name__, self.name, sys._getframe().f_code.co_name))
propagation_mode: true
topdown: true
auto_rollback: true
multiplexable_number: 1
field_order: ascend
persistence: true
abstract: false
api_generation: false
ATOMを作成したら動作を確認しましょう。メソッドを実行するとメソッドと同名のstatusに遷移するだけの単純なオブジェクトです。
>>> p = atom.Phase(name="hoge")↵
... print(p.status)↵
... ↵
↵
None
>>> await p.phase1()↵
... print(p.status)↵
... await p.phase2()↵
... print(p.status)↵
... await p.phase3()↵
... print(p.status)↵
... ↵
Phase.hoge.phase1
phase1
Phase.hoge.phase2
phase2
Phase.hoge.phase3
phase3
>>>
では、本題のLambda Functionを作成しましょう。最初のLambda Functionは、phase0
イベントに反応します。
イベントに与えられた引数を元に事前準備で作成したATOMのインスタンスを生成してデータベースに保存後、次に連鎖するphase1
イベントを発行しています。
event: phase0
name: phase0
script: |-
async def phase0(event, handle):
phase = atom.Phase(**event)
await phase.save()
qprint("Created new:\n%s" % phase.yaml_format)
await handle.chain("phase1", phase.dictionary)
2つ目のLambda Functionを作成します。これはphase1
イベントに反応します。イベントに含まれるphase0
イベントで生成されたATOMインスタンスの識別子を元にデータベースからATOMインスタンスをロードし、phase1
メソッドを実行します。次に連鎖するphase2
イベントを発行します。
event: phase1
name: phase1
script: |-
async def phase1(event, handle):
phase = await atom.Phase.load(event["instance"])
await phase.phase1()
await handle.chain("phase2", phase.dictionary)
3つ目のLambda Functionを作成します。これはphase2
イベントに反応します。イベントに含まれるphase1
イベントから引き継いだATOMインスタンスの識別子を元にデータベースからATOMインスタンスをロードし、phase2
メソッドを実行します。次に連鎖するphase3
イベントを発行します。
event: phase2
name: phase2
script: |-
async def phase2(event, handle):
phase = await atom.Phase.load(event["instance"])
await phase.phase2()
await handle.chain("phase3", phase.dictionary)
4つ目のLambda Functionを作成します。これはphase3
イベントに反応します。イベントに含まれるphase2
イベントから引き継いだATOMインスタンスの識別子を元にデータベースからATOMインスタンスをロードし、phase3
メソッドを実行します。次にhandle.terminate
に結果を出力した後、データベースからATOMインスタンスを削除します。
event: phase3
name: phase3
script: |-
async def phase3(event, handle):
phase = await atom.Phase.load(event["instance"])
await phase.phase3()
await handle.terminate(output=phase.dictionary)
await phase.destroy()
以上でイベント連鎖に使うプラグインは完成したので早速イベントを発行して動作を確認しましょう。
>>> debug()↵
debug channel connected
>>> eventId = await lambda_event("phase0", dict(name="hoge"))↵
... r = await lambda_convergence(eventId)↵
... print(r.yaml_format)↵
... ↵
Created new:
Phase:
instance: UGhhc2U6Y2NhMWZmZTgzZjYzMTFlYjg1MGFhY2RlNDgwMDExMjI=
name: hoge
Phase.hoge.phase1
Phase.hoge.phase2
Phase.hoge.phase3
↵
chains:
- chains:
- chains:
- chains: []
code: 200
content:
instance: UGhhc2U6Y2NhMWZmZTgzZjYzMTFlYjg1MGFhY2RlNDgwMDExMjI=
name: hoge
status: phase2
event: phase3
expire: '2020-12-16T15:04:05.290258+09:00'
id: cddaa1ee3f6311eb850aacde48001122
initiate: '2020-12-16T14:59:06.251735+09:00'
occurrence: '2020-12-16T14:59:05.290222+09:00'
output:
instance: UGhhc2U6Y2NhMWZmZTgzZjYzMTFlYjg1MGFhY2RlNDgwMDExMjI=
name: hoge
status: phase3
terminate: '2020-12-16T14:59:06.286125+09:00'
code: 200
content:
instance: UGhhc2U6Y2NhMWZmZTgzZjYzMTFlYjg1MGFhY2RlNDgwMDExMjI=
name: hoge
status: phase1
event: phase2
expire: '2020-12-16T15:04:04.272636+09:00'
id: cd3f5b4e3f6311eb850aacde48001122
initiate: '2020-12-16T14:59:05.250330+09:00'
occurrence: '2020-12-16T14:59:04.272603+09:00'
output: null
terminate: '2020-12-16T14:59:07.303715+09:00'
code: 200
content:
instance: UGhhc2U6Y2NhMWZmZTgzZjYzMTFlYjg1MGFhY2RlNDgwMDExMjI=
name: hoge
event: phase1
expire: '2020-12-16T15:04:03.249252+09:00'
id: cca333fe3f6311eb850aacde48001122
initiate: '2020-12-16T14:59:04.243668+09:00'
occurrence: '2020-12-16T14:59:03.249226+09:00'
output: null
terminate: '2020-12-16T14:59:09.298983+09:00'
code: 200
content:
name: hoge
event: phase0
expire: '2020-12-16T15:04:02.440918+09:00'
id: cc27dc043f6311eb850aacde48001122
initiate: '2020-12-16T14:59:03.240097+09:00'
occurrence: '2020-12-16T14:59:02.440886+09:00'
output: null
terminate: '2020-12-16T14:59:11.294415+09:00'
>>>
Tip
Lambdaのイベント発行は、lambda_event
組込み関数で行ってきましたが、LambdaBooking
組込みクラスを使用してイベント発行を予約することもできます。Docs » リファレンス » ビルトインオブジェクト
のLambdaBooking
を参照ください。