Pipeline
Pipelineは、サーバレスなコンテナ駆動型のワークフローを実現するサービスです。
以下のモデル図のようにPipelineは複数のPipelineTaskによって構成されます。PipelineTaskは複数のStepを保有します。
定義リソース
Pipelineサービスで管理するリソースは、PipelineTaskとPipelineです。PipelineTaskはワークフローが実行する個々の処理定義を表現するモデルです。PipelineはPipelineTaskを組み合わせた全体のワークフロー定義を表現するモデルです。
PipelineTask
PipelineTaskは、単一あるいは複数のQmonusスクリプトを管理するリソースです。スクリプトは、正常処理をdoScriptフィールドに定義し、キャンセル処理をundoScriptフィールドに定義することができます。また、doScript/undoScriptのセットを1stepとして管理し、複数のstepを保有することができます。
PipelineTaskのフィールド
category
- タスクのカテゴリを指定します。カテゴリはタスクを分類するための単なるラベルです。
name
- タスクの名前を指定します。ユニークでなければなりません。
steps
- 複数の処理stepをリストで保持します。
steps.doScript
- 正常処理として実行するスクリプトを記述します。
steps.undoScript
- キャンセル処理として実行するスクリプトを記述します。
Tip
PipelineTaskのスクリプト実行空間には、params
という辞書変数がプリセットされます。後述するPipelineRunの実行時にパイプラインに対して任意の辞書を渡すことができ、それらはparams
に格納されます。スクリプトの中ではparams
辞書を参照及び編集することができます。編集されたparams
は後続のタスクに引き継がれます。また、スクリプト空間ではシナリオなどと同様に各種組込オブジェクトやATOMを利用してプログラミングすることができます。
Pipeline
Pipelineは、複数のPipelineTaskで構成されるワークフローを管理するリソースです。Pipelineの保有するPipelineTaskリストではPipelineTaskの実行依存関係と実行条件を定義することができます。これにより、タスクのfan-out/fan-in並列実行や条件分岐を実現しています。
Pipelineのフィールド
category
- パイプラインのカテゴリを指定します。カテゴリはパイプラインを分類するための単なるラベルです。
name
- パイプラインの名前を指定します。ユニークでなければなりません。
tasks
- 複数のタスクをリストで保持します。
tasks.name
- タスクに紐付けるPipelineTaskの名前を指定します。
tasks.ref
- パイプライン上においてタスクを識別する名前を指定します。同一のPipeline上で同一のPipelineTaskを複数回利用する場合があるため、Pipeline上でユニークに名前付けするためのラベルです。
tasks.dependsOn
- 該当タスクの実行時に完了していなければならないタスクのrefをリストで保持します。複数のタスクが指定されている場合は、fan-in動作となります。
tasks.condition
- 該当タスクを実行するか否かを判定するスクリプトを記述します。
ランタイムリソース
Qmonus SDKは、Pipelineの実行が指示されるとPipelineのランタイム管理インスタンスとなるPipelineRunを生成します。
PipelineRunは、PipelineTaskの実行インスタンスであるPipelineTaskRunを保有します。また同時に依存関係を解析してロールバックに必要なPipelineTaskも生成します。実際にロールバックで実行されるのは、PipelineTaskリソースのStepの中で定義されたundoScript
です。
Note
PipelineTaskRunは、Docs » FAQ » Q9. サーバレス実行
で紹介しているSDK Runnerとして実行されます。従ってTaskは、kubernetes Job相当となり、Task内のStepはJobから生成されたPod上のコンテナとして動作します。
例えば、以下のように2つのタスクで構成される単純なパイプラインを想定します。
この場合、PipelineRunは以下のような構造で生成されます。
PipelineRun状態マシン
PipelineRunは、実行状態を管理します。状態マシンは以下の通りです。processing
状態で異常を検知するとaborted
状態に遷移しますがその後自動でcancelling
状態に遷移します。
PipelineTaskRun状態マシン
PipelineTaskRunは、実行状態を管理します。状態マシンは以下の通りです。tasks.condition
が指定されていて評価結果がFalseの場合は実行されずにskipped
状態に遷移します。skipped
判定されたタスクは後続のパイプラインの異常検出によるキャンセリング動作の対象外となります。
PipelineRunの作成と実行
PipelineRunの作成
- REPLやスクリプトでの作成方法
p = await PipelineRun.create(<Pipeline名>)
チュートリアル(編集中)
以下の図のように少し複雑なワークフローを持つパイプラインを作成してみましょう。
category: example
name: Pipeline-1
tasks:
- dependsOn: []
ref: Task-1
name: Task-1
- dependsOn:
- Task-1
ref: Task-2
name: Task-2
- dependsOn:
- Task-2
ref: Task-3
name: Task-3
- dependsOn:
- Task-3
ref: Task-4
name: Task-4
- dependsOn:
- Task-2
ref: Task-5
name: Task-5
- dependsOn:
- Task-5
ref: Task-6
name: Task-6
- dependsOn:
- Task-2
ref: Task-7
name: Task-7
- dependsOn:
- Task-7
ref: Task-8
name: Task-8
- dependsOn:
- Task-7
ref: Task-9
name: Task-9
- dependsOn:
- Task-4
- Task-6
- Task-8
- Task-9
ref: Task-10
name: Task-10
- dependsOn:
- Task-10
ref: Task-11
name: Task-11
以下はFrontal向けの情報
PipelineTaskのAPI仕様
PipelineTask登録
POST /pipelinetasks
{
"type": "object",
"required": [
"category",
"name",
"steps"
],
"properties": {
"category": {
"type": "string"
},
"name": {
"type": "string"
},
"steps": {
"type": "array",
"items": {
"type": "object",
"properties": {
"doScript": {
"type": "string",
"format": "python"
},
"undoScript": {
"type": "string",
"format": "python"
}
},
"required": [
"doScript"
]
}
}
}
}
PipelineTask変更
PUT /pipelinetasks/{name}
{
"type": "object",
"required": [
"category",
"name",
"steps"
],
"properties": {
"category": {
"type": "string"
},
"name": {
"type": "string"
},
"steps": {
"type": "array",
"items": {
"type": "object",
"properties": {
"doScript": {
"type": "string",
"format": "python"
},
"undoScript": {
"type": "string",
"format": "python"
}
},
"required": [
"doScript"
]
}
}
}
}
PipelineTask削除
DELETE /pipelinetasks/{name}
PipelineTask取得
GET /pipelinetasks/{name}
PipelineTask一覧
GET /pipelinetasks/{name}
PipelineのAPI仕様
Pipeline登録
POST /pipelines
{
"type": "object",
"required": [
"category",
"name",
"tasks"
],
"properties": {
"category": {
"type": "string"
},
"name": {
"type": "string"
},
"tasks": {
"type": "array",
"minItems": 1,
"items": {
"type": "object",
"required": [
"ref",
"name",
"dependsOn"
],
"properties": {
"ref": {
"type": "string",
"description": "task ref"
},
"name": {
"type": "string",
"description": "pipeline task name"
},
"condition": {
"type": "string",
"format": "python"
},
"dependsOn": {
"type": "array",
"items": {
"type": "string",
"description": "dependent task ref"
}
}
}
}
}
}
}
Pipeline変更
PUT /pipelines/{name}
{
"type": "object",
"required": [
"category",
"name",
"tasks"
],
"properties": {
"category": {
"type": "string"
},
"name": {
"type": "string"
},
"tasks": {
"type": "array",
"minItems": 1,
"items": {
"type": "object",
"required": [
"ref",
"name",
"dependsOn"
],
"properties": {
"ref": {
"type": "string",
"description": "task ref"
},
"name": {
"type": "string",
"description": "pipeline task name"
},
"condition": {
"type": "string",
"format": "python"
},
"dependsOn": {
"type": "array",
"items": {
"type": "string",
"description": "dependent task ref"
}
}
}
}
}
}
}
Pipeline削除
DELETE /pipelines/{name}
Pipeline取得
GET /pipelines/{name}
Pipeline一覧
GET /pipelines
PipelineRunのAPI仕様
PipelineRunの作成
POST /pipelineruns
{
"type": "object",
"required": [
"name",
"parameters"
],
"properties": {
"name": {
"type": "string"
},
"parameters": {
"type": "object"
}
}
}
PipelineRunの操作
PATCH /pipelineruns/{instance_id}
{
"type": "object",
"required": [
"operation"
],
"properties": {
"operation": {
"type": "string",
"enum": [
"run",
"recovery",
"cancel"
]
}
}
}
PipelineRunの削除(未実装)
DELETE /pipelineruns/{instance_id}
PipelineRunの強制削除(未実装)
DELETE /pipelineruns/{instance_id}?force=true
PipelineRunの取得
GET /pipelineruns/{instance_id}
PipelineRunの一覧
GET /pipelineruns
Pipeline及びPipelineTaskの通知チャネル
チャネル名: xaas.pipeline.channel
Pipelineの通知内容
event | content.instance_id | content.name | content.status |
---|---|---|---|
Pipeline | インスタンスID | パイプライン名 | processing, cancelling, complete, cancelled, aborted |
PipelineTaskの通知内容
event | content.instance_id | content.process_type | content.ref | content.name | content.owner_id | content.runner_id | content.status |
---|---|---|---|---|---|---|---|
PipelineTask | インスタンスID | do, undo | 参照ラベル | タスク名 | PipelineRunのインスタンスID | SDKRunnerのID | running, succeeded, failed, skipped |