Schedule

Scheduleサービスは、指定した絶対時刻に実行されるWebフックを予約できる機能です。


管理モデルについて

Webフック予約は、point型とterm型の2種類の概念があります。
point型は、ある時刻に単一のWebフックアクションを実行する予約です。
term型は、期間予約の概念で、開始時刻と開始アクション、終了時刻と終了アクションを持ちます。

Scheduleサービスでは、LifePlanという2つの概念モデルで情報管理されます。
Lifeオブジェクトは、予約そのものを管理する概念です。
Planオブジェクトは、予約時刻にトリガーされるWebフックアクションを管理する概念です。
オブジェクトの関連図を以下に記載します。


Lifeオブジェクトについて

Lifeオブジェクトは、指定された時刻にWebフックアクションを実行するpoint型と、指定された開始時刻と終了時刻にそれぞれWebフックアクションを実行するterm型に分類されます。

Lifeオブジェクトの状態遷移(point型)

Lifeオブジェクトの状態遷移(term型)


Planオブジェクトについて

Planオブジェクトは、指定された絶対時刻に実行されるWebフックアクションを表すモデルです。Planオブジェクトは、Birth型とDeath型の2つのイベントタイプがあります。
point型のLifeオブジェクトの場合は、Birth型のPlanのみ生成されます。
term型のLifeオブジェクトの場合は、Birth型、Death型の2つのPlanオブジェクトが生成されます。

Planオブジェクトの状態遷移(birth型)

Planオブジェクトの状態遷移(death型)

予約情報で管理されるパラメータについて

予約情報(Lifeオブジェクト)でプラグイン開発者が指定する管理パラメータについて解説します。

属性 概要 備考
life_uuid 予約をユニークに識別するIDを指定します。 後述するBooking組込みオブジェクトや拡張ヘッダを利用した予約登録時はデフォルトで自動付与されますが、直接APIを利用する場合は、プラグイン開発者が指定する必要があります。
schedule_type 予約タイプを指定します。pointまたはtermのいずれかを指定する必要があります。 後述するBooking組込みオブジェクトや拡張ヘッダを利用した予約登録時はpoint固定でプラグイン開発者が指定することはできません。
resource_id 予約の対象となるリソースが存在する場合にリソースの識別子を指定することができます。 指定した場合、同一リソース識別子を指定した予約が同時に実行されないように後発の予約はブロックされます。ただし、予約には、実行遅延保護時間(後述するexecution_delay_guard_time)の概念があり、この時間内は予約実行が遅延する可能性があると解釈されますので、遅延保護時間を加味した期間内に同一リソースに対する予約が存在しないことがチェック条件となる点に注意してください。また、実行遅延保護時間のデフォルト値は60分と長めの設定となっているのでお使いの条件に合わせてチューニングしてください。
birth_time Webフックアクションの実行時刻(開始)を指定します。 %Y-%m-%d%H:%M:%S形式で指定する必要があります。timezone起動パラメータに指定されたタイムゾーンで解釈されることに注意してください。
death_time Webフックアクションの実行時刻(終了)を指定します。 %Y-%m-%d%H:%M:%S形式で指定する必要があります。timezone起動パラメータに指定されたタイムゾーンで解釈されることに注意してください。schedule_type=pointの場合、本パラメータは無効です。
birth_action Webフックアクション(開始)を指定します。 pathheadersmethodbodyconnect_timeoutrequest_timeoutretry_countretry_intervalを指定できます。pathmethodは必須です。
death_action Webフックアクション(終了)を指定します。 pathheadersmethodbodyconnect_timeoutrequest_timeoutretry_countretry_intervalを指定できます。pathmethodは必須です。schedule_type=pointの場合、本パラメータは無効です。

Note

birth_actiondeath_actionのWebフック先のAPIは、API Gatewayにルーティング登録されているものが対象です。ScheduleサーバからのWebフックは全てAPI Gatewayに向かって送信されます。


point型Webフック予約のチュートリアル

point型の予約操作は、Booking組込みオブジェクトで簡単に行えます。また、Qmonus SDKのAPI Gatewayに登録されているAPIルーティングに対してX-Xaas-Schedule-Datetime拡張ヘッダを指定して呼び出すことで予約することもできます。

point型予約の使用方法について簡単なチュートリアルで体験しましょう。事前準備として予約のWebフック先となる簡単なAPIを用意しておきます。 以下の定義でシナリオを作成してください。

- category: example
  name: hello
  uri: /hello
  method: GET
  routing_auto_generation_mode: true
  routing_options:
    scope: local
  commands:
    - command: script
      kwargs:
        code: 'qprint("hello, %s" % clock.now())'

Note

用意するシナリオAPIは、予約時刻にScheduleサーバからキックされるので実行結果をREPLで確認できるようqprint組込み関数でメッセージを出力します。REPLで予約しますが、シナリオの実行空間が異なるため、通常のprint文ではREPLに出力をフックできません。そのため、qprintを使ってメッセージをデバッグ用のpubsubチャネルにpublishしてREPLのdebugモードでsubscribeすることで動作をモニタリングします。


シナリオを作成したら、以下のようにBooking組込みオブジェクトを利用して動作を確認できます。

>>> debug()↵
debug channel connected
>>> reservation = Booking("/hello", reservationDatetime=clock.after_now(minutes=1))↵
... await reservation.save()↵
... print("予約時刻: %s" % reservation.reservationDatetime)↵
... ↵
↵
予約時刻: 2020-12-15 10:06:41.618554+09:00
>>> schedules↵
a6d5c6983e7111ebac92acde48001122 a6d5c6983e7111ebac92acde48001122 2020-12-15 10:06:41 Inexistent
hello, 2020-12-15 10:06:42.523634+09:00
>>> schedules↵
a6d5c6983e7111ebac92acde48001122 a6d5c6983e7111ebac92acde48001122 2020-12-15 10:06:41 Dead
>>>

Note

qprintBooking組込みオブジェクトの使用方法については、Docs » リファレンス » ビルトインオブジェクトを参照してください。


次に拡張ヘッダを指定したAPI呼び出しで予約動作を確認します。

>>> debug()↵
debug channel connected
>>> response = await callout(path="/hello", headers={"X-Xaas-Schedule-Datetime": "2020-12-15 10:37:00"})↵
... print(response.code)↵
... print(response.body)↵
... ↵
↵
202
b'{"life_uuid":"cb7adb103e7511ebb42cacde48001122"}'
hello, 2020-12-15 10:37:00.019690+09:00
>>>

Tip

X-Xaas-Schedule-Datetimeは、Qmonus SDKのAPI Gatewayがサポートする独自の拡張ヘッダです。%Y-%m-%d %H:%M:%S形式で時刻を指定します。
本ヘッダが指定されたAPIリクエストは、API GatewayからScheduleサーバに自動連携され、予約登録後、予約ID(life_uuid)がレスポンスとして返却されます。


term型Webフック予約のチュートリアル

term型の予約操作は、Scheduleサーバが提供するAPIを直接利用する方法のみ提供しています。
term型予約の使用方法について簡単なチュートリアルで体験しましょう。term型の予約は、なんらかのリソースを期間的に予約利用するようなユースケースを想定しています。事前準備として簡単なリソースモデルを作成してみましょう。
以下の定義でATOMを作成してください。このConnectionクラスは、activatedeactivateという2つのメソッドを持っています。term型予約を使って予約開始時刻にactivateさせ、予約終了時刻にdeactivateさせるような使い方を想定してみましょう。

category: example
name: Connection
attributes:
  identifier:
    field_name: id
    field_type: string
    field_persistence: true
    field_immutable: true
    field_default: uuid.uuid1().hex
  local_fields:
    - field_name: status
      field_type: string
      field_persistence: true
      field_nullable: true
      field_immutable: false
      field_unique: false
      field_default: Inactive
      field_fsm:
        active:
          execution_method: activate
        inactive:
          execution_method: deactivate
  ref_fields: []
methods:
  class_methods: []
  instance_methods:
    - method_body: |-
        async def activate(self, *args, **kwargs):
            qprint("%s Connection Activated %r" % (clock.now(), self.id))
      propagation_mode: true
      topdown: true
      auto_rollback: true
      multiplexable_number: 1
      field_order: ascend
    - method_body: |-
        async def deactivate(self, *args, **kwargs):
            qprint("%s Connection Deactivated %r" % (clock.now(), self.id))
      propagation_mode: true
      topdown: true
      auto_rollback: true
      multiplexable_number: 1
      field_order: ascend
persistence: true
abstract: false
api_generation: false


ATOMを作成したら動作確認しておきましょう。

>>> debug()↵
debug channel connected
>>> conn = atom.Connection()↵
... await conn.activate()↵
... ↵
2020-12-15 14:37:31.267178+09:00 Connection Activated 'a02238ec3e9711eb93a8acde48001122'
↵
>>> await conn.deactivate()↵
... ↵
2020-12-15 14:37:38.363509+09:00 Connection Deactivated 'a02238ec3e9711eb93a8acde48001122'
↵
>>>


次にWebフック予約の呼び出し先になるAPIを準備する必要があります。シナリオで作成するのも良いですが、ATOMのメソッド呼び出しができれば良いレベルなのでここではJobサービスを使用してみましょう。 以下の定義でJobを作成してください。このJobは、タスクのコンテント情報からidを取り出してConnectionインスタンスをロードした後、コンテント情報のmethodを取り出してロードしたインスタンスのメソッドを起動するだけのものです。

- name: ConnectionHandler
  topics:
    - booking
  script: |-
    conn = await atom.Connection.load(task.content.id)
    await getattr(conn, task.content.method)()

Jobを作成したら動作確認しておきましょう。

>>> conn = atom.Connection()↵
... await conn.save()↵
... ↵
↵
>>> print(conn.id)↵
... ↵
↵
ce72cdd23e9811eb93a8acde48001122
>>> debug()↵
debug channel connected
>>> await task("booking", dict(id="ce72cdd23e9811eb93a8acde48001122", method="activate"))↵
... ↵
↵
2020-12-15 14:47:14.889695+09:00 Connection Activated 'ce72cdd23e9811eb93a8acde48001122'
>>> await task("booking", dict(id="ce72cdd23e9811eb93a8acde48001122", method="deactivate"))↵
... ↵
↵
2020-12-15 14:47:23.937942+09:00 Connection Deactivated 'ce72cdd23e9811eb93a8acde48001122'
>>>


以上で、事前準備は完了です。ScheduleサーバのAPIを利用してterm型の予約を登録してみましょう。
body構造が少し複雑な為、MUを使って構築しています。予約の呼び出し先は、Jobを起動するタスクを登録するTaskサービスのAPIです。

>>> m = MU()↵
... m.life_uuid = uuid.uuid1().hex↵
... m.schedule_type = "term"↵
... m.term = MU(dict(birth_time="2020-12-15 15:05:00", death_time="2020-12-15 15:08:00"))↵
... m.birth = MU(dict(path="/tasks", method=POST, body=dict(topic="booking", content=dict(id="a02238ec3e9711eb93a8acde48001122", method="activate"))))↵
... m.death = MU(dict(path="/tasks", method=POST, body=dict(topic="booking", content=dict(id="a02238ec3e9711eb93a8acde48001122", method="deactivate"))))↵
... print(json.dumps(m.dictionary, indent=4))↵
... ↵
↵
{
    "life_uuid": "6f69aeb63e9b11eb93a8acde48001122",
    "schedule_type": "term",
    "term": {
        "birth_time": "2020-12-15 15:05:00",
        "death_time": "2020-12-15 15:08:00"
    },
    "birth": {
        "path": "/tasks",
        "method": "POST",
        "body": {
            "topic": "booking",
            "content": {
                "id": "a02238ec3e9711eb93a8acde48001122",
                "method": "activate"
            }
        }
    },
    "death": {
        "path": "/tasks",
        "method": "POST",
        "body": {
            "topic": "booking",
            "content": {
                "id": "a02238ec3e9711eb93a8acde48001122",
                "method": "deactivate"
            }
        }
    }
}
>>> r = await callout(path="/schedules", method=POST, body=m.dictionary)↵
... print(r.code)↵
... print(r.body)↵
... ↵
↵
200
b'{"life_uuid":"6f69aeb63e9b11eb93a8acde48001122"}'
2020-12-15 15:05:01.255930+09:00 Connection Activated 'a02238ec3e9711eb93a8acde48001122'
2020-12-15 15:08:01.565405+09:00 Connection Deactivated 'a02238ec3e9711eb93a8acde48001122'
>>>


起動パラメータについて

Scheduleサーバの動作に影響する起動パラメータについて解説します。

起動パラメータ デフォルト値 説明
booking_plan_watch_interval 10000 予約実行計画デーモンの処理間隔(msec)
preset_execution_time 5 予約開始の事前準備を開始する時間(min)
minimum_life_term 3 term型予約の最小生存期間(min)
execution_guard_time 30 予約実行保護時間(sec)
execution_delay_guard_time 60 予約実行遅延保護時間(min)
birth_delay_limit_time 3 予約開始の遅延許容限界時間(min)
death_retry_interval 1 予約終了アクションのリトライ間隔(min)
schedule_history_duration_days 1 予約実行履歴保持期間(day)
timedout_queue_max_size 256 アクション実行待ちキューの最大サイズ
execution_retry_codes 500,502,503,504,599 実行リトライ対象エラーコード

Note

execution_retry_codes は、v22.2LTS-patch20220330 で追加された起動パラメータです。以前のversionでは500系エラーの場合は無条件にリトライが実行されていました。


Scheduleサーバは、登録されている予約情報(Lifeオブジェクト)を定周期で監視します。この周期は、booking_plan_watch_intervalで指定されている間隔です。
予約実行準備フェーズの対象予約を検出するとメモリ上のプリセットタイマーに予約アクション(Planオブジェクト)をエントリします。

Note

Planオブジェクトは以下の場合エントリされます。

エントリ対象期間についてそれぞれ以下で算出
起点 = 現在時刻-birth_delay_limit_time
終点 = 現在時刻+preset_execution_time

  • 対象がエントリ期間内であった場合
    ( 起点 <= 予約時刻 <= 終点 AND 状態 == standby )

  • 対象の終了予約時刻が既に経過していた場合
    ( タイプ == Death AND 予約時刻 <= 現在時刻 AND 状態 == standby )

Note

プリセットタイマーへのエントリ処理と同タイミングで以下の処理も適用されます。

  • 遅延保護時間を経過してしまった予約の無効化処理 ( Invalidatedへの状態遷移 )
    ( 予約時刻 < 起点 AND 状態==standby )

  • 履歴保持期間を経過した予約情報の削除処理
    期限 = 現在時刻-schedule_history_duration_days
    ( 状態 == Stillbirth AND birth_time < 期限 ) OR ( 状態 == Dead AND death_time < 期限 )


タイマーが発火するとアクション実行待ちキューに予約アクション(Planオブジェクト)がキューイングされます。アクション実行待ちキューは、アクションワーカーに監視されており、ワーカーはアクション実行キューからPlanオブジェクトを取り出してアクションを実行します。


予約時刻に対するバリデーションについて

予約時刻は、各種起動パラメータに従って幾つかのバリデーションが適用されます。

生存期間妥当性チェック

term型の場合のみ実行されます。
birth_time >= death_timeの場合、生存期間が存在しないと判断され、予約不可となります。エラーコードは、400 BadRequestとなります。

最小生存期間チェック

term型の場合のみ実行されます。
birth_time+minimum_life_term > death_timeの場合、最小生存期間を満足できないと判断され、予約不可となります。エラーコードは、400 BadRequestとなります。

実行保護時間チェック

予約の登録、及び変更時に実行されます。point型の場合は、birth_timeに対して、term型の場合は、birth_timeとdeath_timeに対してチェックされます。
予約時刻(birth_time or death_time)-execution_guard_time <= 現在時刻の場合、既に予約実行時刻は過去と判断され、予約不可となります。エラーコードは、406 Not Acceptableとなります。

実行遅延保護時間チェック

同一リソースに対して同時に実行される予約が存在しないことをチェックします。本チェックは、resource_idが指定されている予約にのみ適用されます。

  • point型の場合
    実行開始時刻(birth_time)-execution_delay_guard_time
    実行開始時刻(birth_time)+execution_delay_guard_timeの期間において、同一resource_idに対する実行済(Dead状態)ではない予約が存在する場合は予約不可となります。エラーコードは、409 Conflictとなります。

  • term型の場合
    実行開始時刻(birth_time)-execution_delay_guard_time
    実行終了時刻(death_time)+execution_delay_guard_timeの期間において、同一resource_idに対する実行済(Dead状態)ではない予約が存在する場合は予約不可となります。エラーコードは、409 Conflictとなります。