Job
Jobサービスは、ScenarioやATOMのトランザクションの範囲外で非同期に実行する処理を提供します。
Jobサービスでは、メッセージキューを使用して要求タスクをキューイングします。タスクキューのコンシューマはキューから取り出したタスクをJobプラグインにディスパッチして非同期に処理を実行します。尚、メッセージキューには、RedisモードとAMQP(RabbitMQ推奨)モードが選択できます。
Redisモード
タスクのキューイングは、Qmonus SDKのプラグインデータストアであるRedisが使用されます。追加のインストールは必要ありません。Normal Queue、Backup Queue、Recovery Queueで構成され、要求タスクの損失を回避するように設計されています。

AMQP(RabbitMQ)モード
タスクのキューイングは、AMQPが使用されます。AMQP(RabbitMQなど)のソフトウェア環境が必要です。また、pythonモジュールを追加インストールする必要があります。

Jobで定義する情報
ジョブによって定義される情報は次のとおりです。
| 属性1 | 概要 | 備考 |
|---|---|---|
name |
名前を指定してください。 | ジョブを識別するためユニークである必要があります。 |
topics |
ジョブがコンシュームするタスクを分類するためのトピックラベルを指定します。 | ジョブはトピックが一致するタスクを処理します。 後述する slackオプションでは、slackからのbot向けのダイレクトメッセージをtopicsに指定したデータはslackユーザ名として認識されます。 |
script |
ジョブとして実行するスクリプトを記述します。 | スクリプトからは、task変数を使用してタスク情報にアクセスできます。共通コンテキストとビルトインオブジェクトを使用することができます。Docs » リファレンス » ビルトインオブジェクト及びDocs » 名前空間 » リファレンス » 共通コンテキストを参照してください。 |
| slack | slackオプションはClassic Slack App として動作するbot機能を実現するオプションです。 |
slackオプションの設定項目は次のとおりです。 |
slack.token |
Bot User OAuth Access Tokenを指定します。 |
このパラメータは、slackオプションには必須です。また、Qmonus SDKは、rtm:streamの権限を使ってWebsocketでslack.comに接続します。従ってClassic Slack Appのbotユーザを作成する必要があることに注意してください。 |
slack.code_injection |
slackに書き込まれたメッセージをスクリプトコードとして認識するモードです。 | デフォルトはFalseに設定されています。Trueの場合、前述したscriptは実行されないことに注意してください。 |
# ジョブスクリプトでは、`task`変数が参照できます。`task`変数はMU型です。
logger.info("%s %s %r" % (task.taskid, task.topic, task.content))
Warning
slack.code_injectionモードでは、import/for/whileステートメントの実行は許可されていません。 returnステートメントは結果をslackに書き戻すことができますが、4000文字を超える場合は省略されます。
Jobを起動するタスクについて
前述したJobを動作させるためには、タスクを発行する必要があります。タスクは以下の情報を含んでいます。
| 属性 | 概要 | 備考 |
|---|---|---|
topic |
トピックを指定します。 | トピックが一致するJobが動作します。同一のトピックに対して複数の Jobが定義されている場合は、1つのタスク発行で複数のJobが動作することに注意してください。 |
content |
タスクの内容を任意の辞書型(dict)で指定します。 | contentは、Jobのscript内で参照することができます。 |
expire |
タスクの有効期限を秒単位で指定します。 | デフォルトは10秒に設定されています。タスクがキューイングされた状態で何らかの要因で Jobが処理されない時間が継続し、expireを経過してしまった場合は、破棄されます。 |
タスクの発行方法
タスクの発行は、/tasksAPI呼び出し、task組込み関数、及びFrontalのGUIから発行することができます。通常は、プラグインのスクリプトコードでtask関数を利用して発行するケースがほとんどです。
task関数やタスクの完了を待ち合わせるwaitfor関数については、Docs » リファレンス » ビルトインオブジェクトを参照してください。
>>> taskid = await task("hello.job", dict(arg="hello"), expire=30)↵
... r = await waitfor(taskid)↵
... print(r.yaml_format)↵
... ↵
↵
completedAt: '2020-11-26T09:47:27.750420+09:00'
taskid: f43974162f8011eb806facde48001122
>>>
Transaction機能との連携
JobはTransaction機能で発行しているTaskにより実行することもできます。
Qmonus SDKではチューニングパラメータのjob_transaction_eventをtrueに設定することで以下のtopicのTaskがTransactionのステータス変更により自動的に発出されます。
- Transaction.Processing
- Transaction.Complete
- Transaction.Suspending
- Transaction.RecoveryProcessing
- Transaction.RecoveryComplete
- Transaction.ForceRecoveryComplete
- Transaction.Aborted
- Transaction.Cancelling
- Transaction.Cancelled
- Transaction.ForceCancelled
その他にもSDK Runnerのステータス変化により発出するTaskもあります。詳細はDocs » FAQ » Q9. サーバレス実行をご覧ください。
起動パラメータについて
Jobサービスに関する起動パラメータの設定について以下に記載します。
Redisモード
Redisモードは追加のインストールが不要なため、キューの最大サイズやキューからの読み出しサイズの設定だけです。デフォルト値での運用を推奨しています。
-
job_queue_size:NormalQueueのキューの長さです。キューの長さを超えると、例外が発生します。デフォルトは65535に設定されています。 -
job_queue_bulk_read_size: キューから一括で読み出し、同時に実行するジョブの数を制限します。デフォルトは32に設定されています。
AMQPモード
amqp_url: AMQPエンドポイントURLを指定します。デフォルトは、Noneのため、AMQPモードで動作させる場合は、必ずお使いのAMQPのエンドポイントを指定する必要があります。 AMQPモードは、現時点でRabbitMQのみをサポートしていますので、aio-pikaモジュールを追加インストールする必要があります。
pip install aio-pika
Note
JobをScenarioまたはATOMと組み合わせて、イベント駆動型の非同期オーケストレーションを設計できます。SagaパターンにおけるでChoreographyのイメージです。

チュートリアル
簡単なチュートリアルです。タスクは、タスクの発行方法に記載した方法で発行できます。task関数は、シナリオやATOMのカスタムスクリプト、あるいはAPIGWのforbidden_processから呼び出すこともできます。
# The first argument is topic, the second is task content, and the third is expiration seconds.
# The return value is taskid
taskid = await task("hello.job", dict(arg="hello"), expire=10)
# How to call in REST-API
r = await callout(path="/tasks", method="POST", body={"topic": "hello.job", "content": {"arg": "hello"}, "expire": 10})
taskid = json.loads(r.body)["taskid"]
次の job定義yamlをFrontalからアップロードしてください。
- name: example
topics:
- hello
- world
script: |-
if task.topic=="hello":
qprint("Hello, {}".format(task.content.name))
elif task.topic=="world":
qprint("Bye, {}".format(task.content.name))
インタラクティブシェルからタスクを発行します。
>>> debug()↵
Disconnected the currently subscribed channel and connected to the debug channel ['xaas.hotspot.channel']
debug channel connected
>>> await task("hello", dict(name="amuro"))↵
... await task("world", dict(name="amuro"))↵
... ↵
↵
Hello, amuro
Bye, amuro
>>> debug()↵
Disconnected the currently subscribed channel and connected to the debug channel ['xaas.hotspot.channel']
debug channel connected
>>>