Pipeline

Pipelineは、サーバレスなコンテナ駆動型のワークフローを実現するサービスです。
以下のモデル図のようにPipelineは複数のPipelineTaskによって構成されます。PipelineTaskは複数のStepを保有します。


定義リソース

Pipelineサービスで管理するリソースは、PipelineTaskとPipelineです。PipelineTaskはワークフローが実行する個々の処理定義を表現するモデルです。PipelineはPipelineTaskを組み合わせた全体のワークフロー定義を表現するモデルです。

PipelineTask

PipelineTaskは、単一あるいは複数のQmonusスクリプトを管理するリソースです。スクリプトは、正常処理をdoScriptフィールドに定義し、キャンセル処理をundoScriptフィールドに定義することができます。また、doScript/undoScriptのセットを1stepとして管理し、複数のstepを保有することができます。

PipelineTaskのフィールド

category
タスクのカテゴリを指定します。カテゴリはタスクを分類するための単なるラベルです。
name
タスクの名前を指定します。ユニークでなければなりません。
steps
複数の処理stepをリストで保持します。
steps.doScript
正常処理として実行するスクリプトを記述します。
steps.undoScript
キャンセル処理として実行するスクリプトを記述します。

Tip

PipelineTaskのスクリプト実行空間には、paramsという辞書変数がプリセットされます。後述するPipelineRunの実行時にパイプラインに対して任意の辞書を渡すことができ、それらはparamsに格納されます。スクリプトの中ではparams辞書を参照及び編集することができます。編集されたparamsは後続のタスクに引き継がれます。また、スクリプト空間ではシナリオなどと同様に各種組込オブジェクトやATOMを利用してプログラミングすることができます。

Pipeline

Pipelineは、複数のPipelineTaskで構成されるワークフローを管理するリソースです。Pipelineの保有するPipelineTaskリストではPipelineTaskの実行依存関係と実行条件を定義することができます。これにより、タスクのfan-out/fan-in並列実行や条件分岐を実現しています。

Pipelineのフィールド

category
パイプラインのカテゴリを指定します。カテゴリはパイプラインを分類するための単なるラベルです。
name
パイプラインの名前を指定します。ユニークでなければなりません。
tasks
複数のタスクをリストで保持します。
tasks.name
タスクに紐付けるPipelineTaskの名前を指定します。
tasks.ref
パイプライン上においてタスクを識別する名前を指定します。同一のPipeline上で同一のPipelineTaskを複数回利用する場合があるため、Pipeline上でユニークに名前付けするためのラベルです。
tasks.dependsOn
該当タスクの実行時に完了していなければならないタスクのrefをリストで保持します。複数のタスクが指定されている場合は、fan-in動作となります。
tasks.condition
該当タスクを実行するか否かを判定するスクリプトを記述します。


ランタイムリソース

Qmonus SDKは、Pipelineの実行が指示されるとPipelineのランタイム管理インスタンスとなるPipelineRunを生成します。
PipelineRunは、PipelineTaskの実行インスタンスであるPipelineTaskRunを保有します。また同時に依存関係を解析してロールバックに必要なPipelineTaskも生成します。実際にロールバックで実行されるのは、PipelineTaskリソースのStepの中で定義されたundoScriptです。

Note

PipelineTaskRunは、Docs » FAQ » Q9. サーバレス実行で紹介しているSDK Runnerとして実行されます。従ってTaskは、kubernetes Job相当となり、Task内のStepはJobから生成されたPod上のコンテナとして動作します。


例えば、以下のように2つのタスクで構成される単純なパイプラインを想定します。


この場合、PipelineRunは以下のような構造で生成されます。


PipelineRun状態マシン

PipelineRunは、実行状態を管理します。状態マシンは以下の通りです。processing状態で異常を検知するとaborted状態に遷移しますがその後自動でcancelling状態に遷移します。


PipelineTaskRun状態マシン

PipelineTaskRunは、実行状態を管理します。状態マシンは以下の通りです。tasks.conditionが指定されていて評価結果がFalseの場合は実行されずにskipped状態に遷移します。skipped判定されたタスクは後続のパイプラインの異常検出によるキャンセリング動作の対象外となります。


PipelineRunの作成と実行

PipelineRunの作成

  • REPLやスクリプトでの作成方法
p = await PipelineRun.create(<Pipeline名>)

チュートリアル(編集中)

以下の図のように少し複雑なワークフローを持つパイプラインを作成してみましょう。

category: example
name: Pipeline-1
tasks:
- dependsOn: []
  ref: Task-1
  name: Task-1
- dependsOn:
  - Task-1
  ref: Task-2
  name: Task-2
- dependsOn:
  - Task-2
  ref: Task-3
  name: Task-3
- dependsOn:
  - Task-3
  ref: Task-4
  name: Task-4
- dependsOn:
  - Task-2
  ref: Task-5
  name: Task-5
- dependsOn:
  - Task-5
  ref: Task-6
  name: Task-6
- dependsOn:
  - Task-2
  ref: Task-7
  name: Task-7
- dependsOn:
  - Task-7
  ref: Task-8
  name: Task-8
- dependsOn:
  - Task-7
  ref: Task-9
  name: Task-9
- dependsOn:
  - Task-4
  - Task-6
  - Task-8
  - Task-9
  ref: Task-10
  name: Task-10
- dependsOn:
  - Task-10
  ref: Task-11
  name: Task-11


以下はFrontal向けの情報

PipelineTaskのAPI仕様

PipelineTask登録

POST /pipelinetasks
{
  "type": "object",
  "required": [
    "category",
    "name",
    "steps"
  ],
  "properties": {
    "category": {
      "type": "string"
    },
    "name": {
      "type": "string"
    },
    "steps": {
      "type": "array",
      "items": {
        "type": "object",
        "properties": {
          "doScript": {
            "type": "string",
            "format": "python"
          },
          "undoScript": {
            "type": "string",
            "format": "python"
          }
        },
        "required": [
          "doScript"
        ]
      }
    }
  }
}


PipelineTask変更

PUT /pipelinetasks/{name}
{
  "type": "object",
  "required": [
    "category",
    "name",
    "steps"
  ],
  "properties": {
    "category": {
      "type": "string"
    },
    "name": {
      "type": "string"
    },
    "steps": {
      "type": "array",
      "items": {
        "type": "object",
        "properties": {
          "doScript": {
            "type": "string",
            "format": "python"
          },
          "undoScript": {
            "type": "string",
            "format": "python"
          }
        },
        "required": [
          "doScript"
        ]
      }
    }
  }
}


PipelineTask削除

DELETE /pipelinetasks/{name}


PipelineTask取得

GET /pipelinetasks/{name}


PipelineTask一覧

GET /pipelinetasks/{name}


PipelineのAPI仕様

Pipeline登録

POST /pipelines
{
  "type": "object",
  "required": [
    "category",
    "name",
    "tasks"
  ],
  "properties": {
    "category": {
      "type": "string"
    },
    "name": {
      "type": "string"
    },
    "tasks": {
      "type": "array",
      "minItems": 1,
      "items": {
        "type": "object",
        "required": [
          "ref",
          "name",
          "dependsOn"
        ],
        "properties": {
          "ref": {
            "type": "string",
            "description": "task ref"
          },
          "name": {
            "type": "string",
            "description": "pipeline task name"
          },
          "condition": {
            "type": "string",
            "format": "python"
          },
          "dependsOn": {
            "type": "array",
            "items": {
              "type": "string",
              "description": "dependent task ref"
            }
          }
        }
      }
    }
  }
}


Pipeline変更

PUT /pipelines/{name}
{
  "type": "object",
  "required": [
    "category",
    "name",
    "tasks"
  ],
  "properties": {
    "category": {
      "type": "string"
    },
    "name": {
      "type": "string"
    },
    "tasks": {
      "type": "array",
      "minItems": 1,
      "items": {
        "type": "object",
        "required": [
          "ref",
          "name",
          "dependsOn"
        ],
        "properties": {
          "ref": {
            "type": "string",
            "description": "task ref"
          },
          "name": {
            "type": "string",
            "description": "pipeline task name"
          },
          "condition": {
            "type": "string",
            "format": "python"
          },
          "dependsOn": {
            "type": "array",
            "items": {
              "type": "string",
              "description": "dependent task ref"
            }
          }
        }
      }
    }
  }
}


Pipeline削除

DELETE /pipelines/{name}


Pipeline取得

GET /pipelines/{name}


Pipeline一覧

GET /pipelines


PipelineRunのAPI仕様

PipelineRunの作成

POST /pipelineruns
{
  "type": "object",
  "required": [
    "name",
    "parameters"
  ],
  "properties": {
    "name": {
      "type": "string"
    },
    "parameters": {
      "type": "object"
    }
  }
}


PipelineRunの操作

PATCH /pipelineruns/{instance_id}
{
  "type": "object",
  "required": [
    "operation"
  ],
  "properties": {
    "operation": {
      "type": "string",
      "enum": [
        "run",
        "recovery",
        "cancel"
      ]
    }
  }
}


PipelineRunの削除(未実装)

DELETE /pipelineruns/{instance_id}


PipelineRunの強制削除(未実装)

DELETE /pipelineruns/{instance_id}?force=true


PipelineRunの取得

GET /pipelineruns/{instance_id}


PipelineRunの一覧

GET /pipelineruns


Pipeline及びPipelineTaskの通知チャネル

チャネル名: xaas.pipeline.channel


Pipelineの通知内容

event content.instance_id content.name content.status
Pipeline インスタンスID パイプライン名 processing, cancelling, complete, cancelled, aborted

PipelineTaskの通知内容

event content.instance_id content.process_type content.ref content.name content.owner_id content.runner_id content.status
PipelineTask インスタンスID do, undo 参照ラベル タスク名 PipelineRunのインスタンスID SDKRunnerのID running, succeeded, failed, skipped