Job
Jobサービスは、Scenario
やATOM
のトランザクションの範囲外で非同期に実行する処理を提供します。
Jobサービスでは、メッセージキューを使用して要求タスクをキューイングします。タスクキューのコンシューマはキューから取り出したタスクをJobプラグインにディスパッチして非同期に処理を実行します。尚、メッセージキューには、Redisモード
とAMQP(RabbitMQ推奨)モード
が選択できます。
Redisモード
タスクのキューイングは、Qmonus SDKのプラグインデータストアであるRedisが使用されます。追加のインストールは必要ありません。Normal Queue
、Backup Queue
、Recovery Queue
で構成され、要求タスクの損失を回避するように設計されています。
AMQP(RabbitMQ)モード
タスクのキューイングは、AMQPが使用されます。AMQP(RabbitMQなど)のソフトウェア環境が必要です。また、pythonモジュールを追加インストールする必要があります。
Jobで定義する情報
ジョブによって定義される情報は次のとおりです。
属性1 | 概要 | 備考 |
---|---|---|
name |
名前を指定してください。 | ジョブを識別するためユニークである必要があります。 |
topics |
ジョブがコンシュームするタスクを分類するためのトピックラベルを指定します。 | ジョブはトピックが一致するタスクを処理します。 後述する slack オプションでは、slackからのbot向けのダイレクトメッセージをtopics に指定したデータはslackユーザ名 として認識されます。 |
script |
ジョブとして実行するスクリプトを記述します。 | スクリプトからは、task 変数を使用してタスク情報にアクセスできます。共通コンテキスト とビルトインオブジェクト を使用することができます。Docs » リファレンス » ビルトインオブジェクト 及びDocs » 名前空間 » リファレンス » 共通コンテキスト を参照してください。 |
slack | slack オプションはClassic Slack App として動作するbot機能を実現するオプションです。 |
slack オプションの設定項目は次のとおりです。 |
slack.token |
Bot User OAuth Access Token を指定します。 |
このパラメータは、slackオプションには必須です。また、Qmonus SDKは、rtm:stream の権限を使ってWebsocketでslack.com に接続します。従ってClassic Slack App のbotユーザを作成する必要があることに注意してください。 |
slack.code_injection |
slackに書き込まれたメッセージをスクリプトコードとして認識するモードです。 | デフォルトはFalse に設定されています。True の場合、前述したscript は実行されないことに注意してください。 |
# ジョブスクリプトでは、`task`変数が参照できます。`task`変数はMU型です。
logger.info("%s %s %r" % (task.taskid, task.topic, task.content))
Warning
slack.code_injection
モードでは、import/for/while
ステートメントの実行は許可されていません。 return
ステートメントは結果をslackに書き戻すことができますが、4000文字を超える場合は省略されます。
Jobを起動するタスクについて
前述したJobを動作させるためには、タスクを発行する必要があります。タスクは以下の情報を含んでいます。
属性 | 概要 | 備考 |
---|---|---|
topic |
トピックを指定します。 | トピックが一致するJob が動作します。同一のトピックに対して複数の Job が定義されている場合は、1つのタスク発行で複数のJob が動作することに注意してください。 |
content |
タスクの内容を任意の辞書型(dict)で指定します。 | content は、Job のscript 内で参照することができます。 |
expire |
タスクの有効期限を秒単位で指定します。 | デフォルトは10秒 に設定されています。タスクがキューイングされた状態で何らかの要因で Job が処理されない時間が継続し、expire を経過してしまった場合は、破棄されます。 |
タスクの発行方法
タスクの発行は、/tasks
API呼び出し、task
組込み関数、及びFrontalのGUIから発行することができます。通常は、プラグインのスクリプトコードでtask
関数を利用して発行するケースがほとんどです。
task
関数やタスクの完了を待ち合わせるwaitfor
関数については、Docs » リファレンス » ビルトインオブジェクト
を参照してください。
>>> taskid = await task("hello.job", dict(arg="hello"), expire=30)↵
... r = await waitfor(taskid)↵
... print(r.yaml_format)↵
... ↵
↵
completedAt: '2020-11-26T09:47:27.750420+09:00'
taskid: f43974162f8011eb806facde48001122
>>>
Transaction機能との連携
JobはTransaction機能で発行しているTaskにより実行することもできます。
Qmonus SDKではチューニングパラメータのjob_transaction_event
をtrueに設定することで以下のtopicのTaskがTransactionのステータス変更により自動的に発出されます。
- Transaction.Processing
- Transaction.Complete
- Transaction.Suspending
- Transaction.RecoveryProcessing
- Transaction.RecoveryComplete
- Transaction.ForceRecoveryComplete
- Transaction.Aborted
- Transaction.Cancelling
- Transaction.Cancelled
- Transaction.ForceCancelled
その他にもSDK Runnerのステータス変化により発出するTaskもあります。詳細はDocs » FAQ » Q9. サーバレス実行
をご覧ください。
起動パラメータについて
Job
サービスに関する起動パラメータの設定について以下に記載します。
** Redisモード **
Redisモードは追加のインストールが不要なため、キューの最大サイズやキューからの読み出しサイズの設定だけです。デフォルト値での運用を推奨しています。
-
job_queue_size
:NormalQueue
のキューの長さです。キューの長さを超えると、例外が発生します。デフォルトは65535
に設定されています。 -
job_queue_bulk_read_size
: キューから一括で読み出し、同時に実行するジョブの数を制限します。デフォルトは32
に設定されています。
** AMQPモード **
amqp_url
: AMQPエンドポイントURLを指定します。デフォルトは、None
のため、AMQPモードで動作させる場合は、必ずお使いのAMQPのエンドポイントを指定する必要があります。 AMQPモードは、現時点でRabbitMQ
のみをサポートしていますので、aio-pika
モジュールを追加インストールする必要があります。
pip install aio-pika
Note
Job
をScenario
またはATOM
と組み合わせて、イベント駆動型の非同期オーケストレーションを設計できます。SagaパターンにおけるでChoreographyのイメージです。
チュートリアル
簡単なチュートリアルです。タスクは、タスクの発行方法
に記載した方法で発行できます。task
関数は、シナリオやATOMのカスタムスクリプト、あるいはAPIGWのforbidden_processから呼び出すこともできます。
# The first argument is topic, the second is task content, and the third is expiration seconds.
# The return value is taskid
taskid = await task("hello.job", dict(arg="hello"), expire=10)
# How to call in REST-API
r = await callout(path="/tasks", method="POST", body={"topic": "hello.job", "content": {"arg": "hello"}, "expire": 10})
taskid = json.loads(r.body)["taskid"]
次の job
定義yamlをFrontalからアップロードしてください。
- name: example
topics:
- hello
- world
script: |-
if task.topic=="hello":
qprint("Hello, {}".format(task.content.name))
elif task.topic=="world":
qprint("Bye, {}".format(task.content.name))
インタラクティブシェルからタスクを発行します。
>>> debug()↵
Disconnected the currently subscribed channel and connected to the debug channel ['xaas.hotspot.channel']
debug channel connected
>>> await task("hello", dict(name="amuro"))↵
... await task("world", dict(name="amuro"))↵
... ↵
↵
Hello, amuro
Bye, amuro
>>> debug()↵
Disconnected the currently subscribed channel and connected to the debug channel ['xaas.hotspot.channel']
debug channel connected
>>>