Job

Jobサービスは、ScenarioATOMのトランザクションの範囲外で非同期に実行する処理を提供します。 Jobサービスでは、メッセージキューを使用して要求タスクをキューイングします。タスクキューのコンシューマはキューから取り出したタスクをJobプラグインにディスパッチして非同期に処理を実行します。尚、メッセージキューには、RedisモードAMQP(RabbitMQ推奨)モードが選択できます。


Redisモード

タスクのキューイングは、Qmonus SDKのプラグインデータストアであるRedisが使用されます。追加のインストールは必要ありません。Normal QueueBackup QueueRecovery Queueで構成され、要求タスクの損失を回避するように設計されています。


AMQP(RabbitMQ)モード

タスクのキューイングは、AMQPが使用されます。AMQP(RabbitMQなど)のソフトウェア環境が必要です。また、pythonモジュールを追加インストールする必要があります。


Jobで定義する情報

ジョブによって定義される情報は次のとおりです。

属性1 概要 備考
name 名前を指定してください。 ジョブを識別するためユニークである必要があります。
topics ジョブがコンシュームするタスクを分類するためのトピックラベルを指定します。 ジョブはトピックが一致するタスクを処理します。
後述するslackオプションでは、slackからのbot向けのダイレクトメッセージをjobの起動トリガーにすることが可能ですが、この場合はtopicsに指定したデータはダイレクトメッセージを送信するslackユーザ名として認識されます。
script ジョブとして実行するスクリプトを記述します。 スクリプトからは、task変数を使用してタスク情報にアクセスできます。スクリプトが実行される名前空間では、共通コンテキストビルトインオブジェクトを使用することができます。これらについては、Docs » リファレンス » ビルトインオブジェクト及びDocs » 名前空間 » リファレンス » 共通コンテキストを参照してください。
slack slackオプションはClassic Slack App として動作するbot機能を実現するオプションです。 slackオプションの設定項目は次のとおりです。
slack.token Bot User OAuth Access Tokenを指定します。 このパラメータは、slackオプションには必須です。また、Qmonus SDKは、rtm:streamの権限を使ってWebsocketでslack.comに接続します。従ってClassic Slack Appのbotユーザを作成する必要があることに注意してください。
slack.code_injection slackに書き込まれたメッセージをスクリプトコードとして認識するモードです。 デフォルトはFalseに設定されています。Trueの場合、前述したscriptは実行されないことに注意してください。
# ジョブスクリプトでは、`task`変数が参照できます。`task`変数はMU型です。
logger.info("%s %s %r" % (task.taskid, task.topic, task.content))

Warning

slack.code_injectionモードでは、import/for/whileステートメントの実行は許可されていません。 returnステートメントは結果をslackに書き戻すことができますが、4000文字を超える場合は省略されます。


Jobを起動するタスクについて

前述したJobを動作させるためには、タスクを発行する必要があります。タスクは以下の情報を含んでいます。

属性 概要 備考
topic トピックを指定します。 トピックが一致するJobが動作します。
同一のトピックに対して複数のJobが定義されている場合は、1つのタスク発行で複数のJobが動作することに注意してください。
content タスクの内容を任意の辞書型(dict)で指定します。 contentは、Jobscript内で参照することができます。
expire タスクの有効期限を秒単位で指定します。 デフォルトは10秒に設定されています。
タスクがキューイングされた状態で何らかの要因でJobが処理されない時間が継続し、expireを経過してしまった場合は、破棄されます。


タスクの発行方法

タスクの発行は、/tasksAPI呼び出し、task組込み関数、及びFrontalのGUIから発行することができます。通常は、プラグインのスクリプトコードでtask関数を利用して発行するケースがほとんどです。 task関数やタスクの完了を待ち合わせるwaitfor関数については、Docs » リファレンス » ビルトインオブジェクトを参照してください。

>>> taskid = await task("hello.job", dict(arg="hello"), expire=30)↵
... r = await waitfor(taskid)↵
... print(r.yaml_format)↵
... ↵
↵
completedAt: '2020-11-26T09:47:27.750420+09:00'
taskid: f43974162f8011eb806facde48001122
>>>


Transaction機能との連携

JobはTransaction機能で発行しているTaskにより実行することもできます。
Qmonus SDKではチューニングパラメータのjob_transaction_eventをtrueに設定することで以下のtopicのTaskがTransactionのステータス変更により自動的に発出されます。

  • Transaction.Processing
  • Transaction.Complete
  • Transaction.Suspending
  • Transaction.RecoveryProcessing
  • Transaction.RecoveryComplete
  • Transaction.ForceRecoveryComplete
  • Transaction.Aborted
  • Transaction.Cancelling
  • Transaction.Cancelled
  • Transaction.ForceCancelled

その他にもSDK Runnerのステータス変化により発出するTaskもあります。詳細はDocs » FAQ » Q9. サーバレス実行をご覧ください。


起動パラメータについて

Jobサービスに関する起動パラメータの設定について以下に記載します。

** Redisモード **

Redisモードは追加のインストールが不要なため、キューの最大サイズやキューからの読み出しサイズの設定だけです。デフォルト値での運用を推奨しています。

  • job_queue_size: NormalQueueのキューの長さです。キューの長さを超えると、例外が発生します。デフォルトは65535に設定されています。

  • job_queue_bulk_read_size: キューから一括で読み出し、同時に実行するジョブの数を制限します。デフォルトは32に設定されています。


** AMQPモード **

  • amqp_url: AMQPエンドポイントURLを指定します。デフォルトは、Noneのため、AMQPモードで動作させる場合は、必ずお使いのAMQPのエンドポイントを指定する必要があります。 AMQPモードは、現時点でRabbitMQのみをサポートしていますので、aio-pikaモジュールを追加インストールする必要があります。
pip install aio-pika


Note

JobScenarioまたはATOMと組み合わせて、イベント駆動型の非同期オーケストレーションを設計できます。SagaパターンにおけるでChoreographyのイメージです。


チュートリアル

簡単なチュートリアルです。タスクは、タスクの発行方法に記載した方法で発行できます。task関数は、シナリオやATOMのカスタムスクリプト、あるいはAPIGWのforbidden_processから呼び出すこともできます。

# The first argument is topic, the second is task content, and the third is expiration seconds.
# The return value is taskid
taskid = await task("hello.job", dict(arg="hello"), expire=10)

# How to call in REST-API
r = await callout(path="/tasks", method="POST", body={"topic": "hello.job", "content": {"arg": "hello"}, "expire": 10})
taskid = json.loads(r.body)["taskid"]


次の job定義yamlをFrontalからアップロードしてください。

- name: example
  topics:
    - hello
    - world
  script: |-
    if task.topic=="hello":
        qprint("Hello, {}".format(task.content.name))
    elif task.topic=="world":
        qprint("Bye, {}".format(task.content.name))


インタラクティブシェルからタスクを発行します。

>>> debug()↵
Disconnected the currently subscribed channel and connected to the debug channel ['xaas.hotspot.channel']
debug channel connected
>>> await task("hello", dict(name="amuro"))↵
... await task("world", dict(name="amuro"))↵
... ↵
↵
Hello, amuro
Bye, amuro
>>> debug()↵
Disconnected the currently subscribed channel and connected to the debug channel ['xaas.hotspot.channel']
debug channel connected
>>>