CLA (Closed Loop Automation)

CLA(Closed Loop Automation)は、ETSI ZSM(Zero touch network & Service Management)に準拠したクローズドループの実装をサポートするサービスです。尚、本機能はv22.7LTS以降のバージョンで利用できます。


クローズドループについて

クローズドループは、特定の目標を維持することを目指して、管理対象エンティティを監視および制御するためのメカニズムです。クローズドループは論理的に可変数のステージに分解でき、各ステージはクローズドループの機能の一部を実行する役割を果たします。 よく知られているクローズドループの種別は、4つのステージ(Observe, Orient, Decide, Act)で構成されるOODAループと、同じく4つのステージ(Monitor, Analyze, Plan, Execute)とKnowledgeで構成されるMAPE-Kです。 クローズドループ自動化(CLA)は、監視データからのフィードバックに基づいてネットワークを管理できる自動化されたプロセスのステージの組み合わせであり、システムの運用から人的関与を削減または排除します。管理システムのCLAは、管理サービス(データ、分析、ポリシー、オーケストレーションなど)の組み合わせと連鎖によって実現でき、ネットワークを常に監視および評価し、目標が達成されていない場合に修正アクションを実行できる自律システムを提供します。 CLAの目的は、人間による直接的な介入を減らすことですが、自律システムが人間のオペレーターとの対話を許可することが重要です。このような相互作用は、CLの目標の指定と変更、および自律システムのパフォーマンスとそれによって実行されるアクションの最終的な承認/拒否を監視するために使用できます。 この機能の焦点は、CLの作成と実行、およびCL間の相互運用性を可能にする汎用フレームワークとして動作することです。


ZSMフレームワークにおけるクローズドループについて

ZSMフレームワークでは、CLは、さまざまな段階の機能実現(ETSI GS ZSM 002)で定義された管理サービスの相互作用によって実現されます。 CLは、ステージを使用して、データを自律的に収集し、決定を下し、1つ以上の管理対象エンティティに対してアクションを実行できます。CLを構成する一連のステージには、1つまたは複数のソース(1つ以上の管理対象エンティティや外部データソースなど)からのデータの収集を担当する「Monitoring」と呼ばれる少なくとも1つのステージと、「Execution」と呼ばれる1つのステージが含まれます。これは1つ以上の管理対象エンティティに対してアクションを実行する責任があります。実行が行われる管理対象エンティティは、データの取得元のエンティティと必ずしも同じではありません。2つの基本段階、つまり監視と実行に加えて、運用データと履歴データの分析、および分析の結果に基づく決定を担当する他の段階が存在する場合があります。データの「Monitoring」と「Execution」の間の中間段階の数と機能は、実装と展開の選択によって異なります。この仕様では、ZSMフレームワーク内でCLを構成する一定数のステージを義務付けていませんが、少なくとも3つを推奨しています。1つはMonitoring、1つはExecution、もう1つはAnalyzeとDecisionです。AnalyzeとDecisionは、さらにいくつかの段階で構成することができます。




CLG(Closed Loop Governance)

CLGは、外部エンティティがCLのライフサイクルを管理し、その動作を構成できるようにする一連の機能です。CLGを使用して、ステータス(ヘルスを含む)やパフォーマンス情報などのCLに関する情報を取得することもできます。CLGが持つCapabilityとして以下の機能実装が推奨されています。

  • CLのCreate/Update/Delete/Get/List
  • CLの停止/起動
  • CLの統計情報収集の有効化/無効化
  • CLのポリシー、ルール、トリガー、優先順位の構成

CLGによるCLのライフサイクルマネジメントについて各フェーズとアクティビティの概念図は以下の通りです。



CLは、ZSMフレームワーク内の管理対象エンティティであり、独自のライフサイクルがあります。 CLのライフサイクルは、CLの各動作フェーズ、アクティビティ、および状態を定義しています。これは、ZSMフレームワークの所有者、またはCLと対話する他のエンティティに、CLの可能なアクティビティと状態に関する統一されたビューを提供し、相互作用するエンティティとそれらの間の相互作用(CLの作成、CLの展開、CLの調整、CLの変更、CLの終了など)を識別します。 CLのDesign-Timeは、準備フェーズで構成されます。 CLは、CLモデルに基づいて設計時に定義されます。 CLのRun-Timeは、Commissioning、Operation、およびDecommissioningの各フェーズで構成されます。これらの操作はCLGを介してコントロールされます。 尚、CLフェーズの概念は、CLステージの概念とは異なります。CLステージは、CL内部の機能分割を表します。CLフェーズは、CLの外部のエンティティとの相互作用から生じるアクティビティと状態を表します。


Closed Loop Model

Qmonus SDKにおけるクローズドループ概念モデルは、以下の通りです。

ClosedLoopクラスは、ClosedLoopGoalとClosedLoopPolicyを持つことができます。ZSMではClosedLoopGoalを少なくとも1つ持つことを推奨していますがQmonus SDKでは目標のないクローズドループを作ることも許可しています(単純な定周期デーモンなどの用途を想定)。また管理対象となるManagedEntityを複数保有することができます。こちらもZSMでは少なくとも1つのターゲット管理エンティティが必要ですが管理エンティティを持たないクローズドループの作成を許可しています。ClosedLoopComponentはZSMの規定に従い、1つ以上のClosedLoopComponentを含む必要があります。最後にonSpawnHook、preOperationComponent、postOperationComponentについてはQmonus SDK独自の拡張です。クローズドループがインスタンス化されるタイミングやoperationフェーズの前後に任意の処理を差し込むことでクローズドループ共通の振る舞いを拡張するために提供されます。詳細はClosedLoopTypeの解説を参照ください。


ClosedLoopクラス

ClosedLoopクラスには以下の属性が定義されています。

  • closedLoopInstanceUniqueId
    CLインスタンスの識別子です。文字列型で必須属性です。

  • closedLoopLifeCyclePhases
    サポートされているライフサイクルフェーズのリストです。リストで許可される値は、preparation、commissioning、operation、およびdecommissioningです。 Qmonus SDKのClosedLoop組込みクラスのデフォルト値は全てのフェーズが設定されます。

  • currentClosedLoopLifeCyclePhase
    CLがどのライフサイクルフェーズにあるかを示す属性です。

  • closedLoopPriority
    CLの優先度を示します。同じ管理対象エンティティに対するアクションの競合を回避するように設定することができます。設定値や使い方についてZSMではOut of scopeでありフレームワークのユーザが自由に規定できるとしており、Qmonus SDKにおいてはinteger型の数値で設定することができます。使い方についてはプラグインのユーザ定義領域と位置付けています。

  • closedLoopTypeDescription
    CLタイプの説明を示すオプション属性です。

  • closedLoopGoal
    CLの目標を示す属性です。CLの目標は、CLが満たす目的を決定します。目標は、CLの望ましい動作と、CLの実行中に満たす必要のある期待を指定します。 preparationフェーズでは、クローズドループの目標は、ZSMフレームワークのベンダー、ZSMフレームワークの所有者、またはZSMフレームワークの所有者に代わって他のエンティティによって定義されます。特定のクローズドループに対して複数の目標を定義できます。 commissioningフェーズでは、クローズドループの目標は、ZSMフレームワークの所有者、またはZSMフレームワークの所有者に代わって他のエンティティによって初期の参照値で構成されます。 operationフェーズでは、クローズドループの目標パラメーターの値をZSMフレームワークの所有者が変更することも、ZSMフレームワークの所有者に代わって他のエンティティが変更してCLの動作を調整することもできます。CL目標は、CLGサービスの目標管理機能を使用してoperationフェーズで管理できます。 Qmonus SDKではclosedLoopGoalを辞書形式で宣言的に記述する必要があります。

  • manageableEntityList
    CLで管理できるエンティティの種類を示す属性です。エンティティはインスタンス化されたエンティティではなく、エンティティの型を識別する文字列です。Qmonus SDKではATOMクラスのクラス名を指定することを推奨しています。また、ClosedLoopは予約語扱いとなり、組込みのClosedLoopクラスを意味します。本属性のデフォルト値はClosedLoopのみが指定されたリストが設定されます。

  • targetEntityList
    CLインスタンスが正常にデプロイされ、インスタンス化された後に管理する必要があるエンティティを示す属性です。Qmonus SDKでは、管理対象エンティティの型とインスタンスIDのリストとして指定します。デフォルトは空リストが設定されます。

  • closedLoopComponentList
    CLを構成する機能コンポーネントを示す属性です。CLの各ステージの実態となるClosedLoopComponentの識別子をリストで保持します。CLは、このリスト順に従ってClosedLoopComponentを駆動し続けます。

  • closedLoopPolicy
    CLインスタンスに適用可能なポリシーを定義する属性です。CLがアクションを決定付ける際の行動指針を定義できます。Qmonus SDKでは、closedLoopPolicyを辞書形式で宣言的に記述する必要があります。

  • preOperationComponent
    CLインスタンスがcommissioningされ、operationフェーズに移行する前に実行されるClosedLoopComponentを指定するオプション属性です。本属性はZSM標準で規定されておらずQmonus SDK独自拡張となります。

  • postOperationComponent
    CLインスタンスがdecommissioningされ、operationフェーズを終了する際に実行されるClosedLoopComponentを指定するオプション属性です。本属性はZSM標準で規定されておらずQmonus SDK独自拡張となります。

  • metadata
    CLインスタンスに任意のメタ情報を保有させることができるオプション属性です。CLインスタンス固有の情報を持たせることで各種ClosedLoopComponent処理から参照、更新することが可能です。本属性はZSM標準で規定されておらずQmonus SDK独自拡張となります。

  • _imagePullPolicy
    CLインスタンスがcommissioningされるとCLインスタンスの実行実態はkubernetes JobのPodに移行します。Podのイメージをコンテナリポジトリからpullする際のポリシーを指定できるオプション属性です。本属性はZSM標準で規定されておらずQmonus SDK独自拡張となります。

  • _secret
    CLインスタンスがcommissioningされるとCLインスタンスの実行実態はkubernetes JobのPodに移行します。Podに対してSecret情報をマウントさせたい場合に任意の辞書を設定できるオプション属性です。本属性はZSM標準で規定されておらずQmonus SDK独自拡張となります。


ClosedLoopComponentクラス


ClosedLoopComponentは、CLの各ステージでの動作を表現する管理概念です。ClosedLoopComponentクラスには以下の属性が定義されています。

  • category
    ClosedLoopComponentのカテゴリを設定する必須属性です。プラグイン開発者の識別を助けるために用意されたCLインスタンスの動作には何ら影響しないパラメータです。本属性はZSM標準で規定されておらずQmonus SDK独自拡張となります。
  • closedLoopComponentName ClosedLoopComponentの名前を設定する必須属性です。本属性はZSM標準で規定されておらずQmonus SDK独自拡張となります。
  • closedLoopComponentDescription
    ClosedLoopComponentの機能を説明するオプション属性です。
  • inputDataList
    ClosedLoopComponentがCLの内部または外部の他のエンティティから受信できる必須およびオプションのパラメータをリスト定義するためのオプション属性です。
  • outputDataList
    ClosedLoopComponentがCLの内部または外部の他のエンティティに提供できるパラメータをリスト定義するためのオプション属性です。
  • producedManagementCapabilitiesList
    ClosedLoopComponentによって提供される機能をリスト定義するための属性です。必ず1つ以上の機能を提供する必要があります。Qmonus SDKではFunctionプラグイン名を指定する必要があります。
  • consumedManagementCapabiltiesList
    ClosedLoopComponentが機能するために消費する機能をリスト定義するためのオプション属性です。Qmonus SDKでは本属性について定義されていますが、ランタイムでは使用していません。


ManagedEntityクラス

ManagedEntityクラスには以下の属性が定義されています。

  • managedEntityType
    管理対象エンティティの型を示します。許可される値は、マネージドリソース、マネージドサービス、またはClosedLoopです。Qmonus SDKではClosedLoopは予約語です。マネージドリソースやマネージドサービスの識別にはATOMクラス名を推奨しています。

  • managedEntityId
    管理対象エンティティの識別子を示します。Qmonus SDKでは、ATOMインスタンスのidentifierフィールドの値もしくはinstance属性、あるいはClosedLoopインスタンスのclosedLoopInstanceUniqueIdを指定します。


ClosedLoopTypeクラス

上述したClosedLoopクラスの属性は多岐に渡り、インスタンス生成時に都度属性を設定するのは冗長です。ClosedLoopTypeクラスは、ClosedLoopインスタンスを生成するテンプレート的な位置付けで提供されています。ClosedLoopTypeに各種属性をプリセットしておくことでCLGから最小限のコードでインスタンス生成が可能になります。尚、ClosedLoopTypeは、ZSM標準には規定されておらずQmonus SDK独自拡張となります。

  • category
    ClosedLoopクラスのcategoryに展開される属性です。必須属性となります。
  • closedLoopTypeName ClosedLoopTypeをユニークに識別する名前です。必須属性となります。
  • closedLoopPriority ClosedLoopクラスのclosedLoopPriorityに展開される属性です。必須属性となります。
  • closedLoopTypeDescription ClosedLoopクラスのclosedLoopTypeDescriptionに展開される属性です。
  • closedLoopGoal ClosedLoopクラスのclosedLoopGoalに展開される属性です。
  • manageableEntityList ClosedLoopクラスのmanageableEntityListに展開される属性です。
  • preOperationComponent ClosedLoopクラスのpreOperationComponentに展開される属性です。
  • postOperationComponent ClosedLoopクラスのpostOperationComponentに展開される属性です。
  • closedLoopComponentList ClosedLoopクラスのclosedLoopComponentListに展開される属性です。必須属性となります。
  • closedLoopPolicy ClosedLoopクラスのclosedLoopPolicyに展開される属性です。
  • metadata ClosedLoopクラスのmetadataに展開される属性です。
  • onSpawnHook ClosedLoopクラスのonSpawnHookに展開される属性です。
  • podOptions closedloopを動作させるdeploymentマニフェストに追加するパラメータを記載します。本機能はv23.2LTSで追加されました。podOptionsで指定可能なパラメータは以下になります。
key valueType 説明 指定例
serviceAccount str template.spec.service_account_nameを指定します。 "hoge@example.com"
nodeSelector object affinity, tolerationsを使用する場合に指定します。 {"key":"hoge", "memory":"256Mi"}, "value":"fuga"}
secretVolumeMounts array secretVolume関連のパラメータをarray形式で指定します。 [{"name":"hoge", "defaultMode":384, "mountPath": "/var/secrets", "readOnly": False}]
resources object コンテナに割り当てるリソース制約を指定します。 {"requests"{"cpu":"50m", "memory":"256Mi"}, "limits":{"cpu":"512m", "memory":"1Gi"}}
annotations object メタデータとして渡す注釈を指定します。 {"key":"hoge"}

Note

v23.2LTS-patch20241022からPodOptionsannotationsが指定可能になりました。


CLC(Closed Loop Coordination)

CLCは、実行されている複数のCLを調整できるようにする一連の機能であり、主な目的は、パフォーマンスの向上と目標の達成です。 CLCは、実行時の複数のクローズドループ間の相互作用として起こりうる目標または問題の委任とエスカレーション、アクションの調整、各CLステージによって生成された情報の共有、競合するCLの調整など様々な調整課題を解決するためのサービス機能です。CL間の競合は、CLの操作に悪影響を与える可能性があります。競合は、同じまたは異なる管理対象エンティティのセットを含む2つ以上のCL間で発生する可能性があります。
ZSMフレームワーク内の2つのCL間の関係は、階層CLまたはピアCLとして分類できます。階層関係は、あるクローズドループが別のクローズドループを制御することを許可されている場合です。ピア関係は、2つのクローズドループが互いの動作に影響を与える理由がある場合ですが、一方のクローズドループは他方の責任を負わず、両方が独立して存在します。 CLCが持つCapabilityとして以下の機能実装が推奨されています。

  • 特定のスコープを共有する個々のクローズドループの目標を調整する機能
  • 協力などのクローズドループ間のさまざまな相互作用タイプを識別する機能、対立または依存関係
  • パラメーターの競合、メトリックの競合、間接的な競合など、クローズドループ間のさまざまなタイプの競合を識別する機能
  • 競合解決メカニズムなどの適切なメカニズムを使用して、クローズドループ間のさまざまな相互作用に対処する機能
  • クローズドループの提案されたアクションの実行前に、そのようなアクションが他のクローズドループまたは管理対象エンティティに望ましくない影響を引き起こす可能性があることを識別する機能(実行前と実行後の調整、同時実行の調整など)
  • 実行後のクローズドループアクションの影響と有効性を評価する機能(影響評価など)

CLCは、さまざまな方法でさまざまな時間に相互作用できます。これらのサービスの典型的な発生時間を説明する例示的なタイムラインは以下の通りです。




Qmonus SDKにおけるデプロイメントモデル

Qmonus SDKにおけるクローズドループのデプロイメントモデルは、Qmonus SDKのRunnerモードをベースに構築されています。 従ってクローズドループ単位にKubernetes Jobとして新たなコンテナで実行されます。全てのクローズドループコンテナはQmonus SDKのホストインスタンスから必要な設定を引き継ぐことでデータベースを共有することが可能となり、ZSMで規定されているKnowledgeコンポーネントのように全てのクローズドループが共通のデータにアクセスすることが可能になります。 以下にデプロイメントモデルの概念図を記載します。



CLAは、組込みコンポーネントとして提供されるClosedLoopComponentClosedLoopTypeClosedLoopCLGCLCによって実現され、CLGがClosedLoopを操作するための各種APIを提供します。 各コンポーネントについて以下に解説します。

ClosedLoopComponent

ClosedLoopComponentは、CLステージとして動作する機能です。上述したZSMで規定されているデータモデルで実装されており、producedManagementCapabilitiesListにFunctionプラグイン名を指定することでCLステージとしてFunctionプラグインを実行します。inputDataListには当該ClosedLoopComponentに渡す必要がある引数名を列挙します。ClosedLoopComponentがFunctionプラグインを実行時にClosedLoopの名前空間に存在するmetadataから該当の変数を抽出し、Function実行引数として渡します。留意点としてClosedLoopComponentとして利用するFunctionの第一引数にはselfを必ず記述してください。selfには後述するClosedLoopインスタンスが格納されます。またoutputDataListにはFunctionプラグインの返り値を列挙します。Pythonでは返り値に変数名を付与できませんが、outputDataListに記述した順序で返り値に名前付けしてClosedLoopの名前空間に保存することができます。これによって後続のステージで実行されるFunctionプラグインにデータを引き継ぐことが可能となります。

以下は、ClosedLoopComponentで使用するFunctionプラグインのサンプルです。Google Cloud Pub/SubをIntegration Fabricとして配置し、購読したデータを返却します。

category: CLA Tutorial
name: monitoringStage
code: |
  async def monitoringStage(self, changedGoalStatement):
      from gcloud.aio.pubsub import SubscriberClient, SubscriberMessage
      import backoff

      class SubscriberClientWithBackoff(SubscriberClient):
          @backoff.on_exception(
              backoff.expo,
              aiohttp.ClientResponseError,
              max_tries=5,
              jitter=backoff.full_jitter
          )
          async def pull(self, *args, **kwargs):
              return await super().pull(*args, **kwargs)

      messages = []
      try:
          async with SubscriberClientWithBackoff(
              service_file=self.metadata["SA"]
          ) as client:
              _: List[SubscriberMessage] = await client.pull(
                  self.metadata["sub"],
                  max_messages=128
              )
              if _:
                  await client.acknowledge(
                      self.metadata["sub"],
                      [i.ack_id for i in _]
                  )
                  messages.extend(_)
      except Exception:
          logger.info("backoff timedout, continue")

      if changedGoalStatement:
          """Goalが変化した場合は滞留イベントの信憑性が低いと判断し、廃棄する
          """
          return []

      [
          logger.info(f"received raw data: {i.data}") for i in messages
      ]
      return [json.loads(i.data) for i in messages]


以下は、上述したmonitoringStageプラグインを利用するClosedLoopComponentプラグインのサンプルです。inputDataListに指定した変数は、ClosedLoopの名前空間から伝搬されます。outputDataListに指定した変数名でmonitoringStateプラグインのリターン値をClosedLoopの名前空間に保存します。

category: CLA example
closedLoopComponentDescription: Monitoring Stage
closedLoopComponentName: monitoring
inputDataList:
- changedGoalStatement
outputDataList:
- monitoringDatas
producedManagementCapabilitiesList:
- monitoringStage


Tip

ClosedLoopComponentのinputDataListはClosedLoopComponentの登録及び変更時に参照するFunctionプラグインの引数が自動割当されるため開発者は指定を省略することができます。ただし、ClosedLoopComponentを保存後にFunctionプラグイン自体を修正した場合は反映されませんので注意が必要です。その場合はClosedLoopComponentの再保存によって再割当してください。outputDataListについては返り値に対する名前付けをアノテーションすることができないため、原則開発者が名前付けする必要があります。outputDataListを指定せずに登録、又は変更した場合は、f"{Functionプラグイン名}Resp"という名称の変数名がoutputDataListに自動設定されることに注意してください。


ClosedLoopType

ClosedLoopTypeは、CLの各種属性値がプリセットされたいわばCLのテンプレート機能です。例えば、ネットワークスライス用CL、TNドメインのネットワークスライスサブネット用CLなどCLの用途に応じて各種属性を事前にプリセットした状態で名前付けすることができます。後述するCLGではこのClosedLoopType名を指定してClosedLoopインスタンスを生成することが可能です。もちろん不足している属性パラメータを生成時に指定することも可能です。
尚、Qmonus独自拡張としてCL属性にonSpawnHook、preOperationComponent、postOperationComponent、metadataを定義することができます。
onSpawnHookは、CLGがCLを生成する際に実行されるFunctionプラグインを指定することができます。CLのランタイム時に必要となる鍵ファイルの読み込みなど生成時に実行しておきたい処理を記述することができます。preOperationComponent、postOperationComponentはCLのoperationフェーズの開始時、終了時に任意のFunctionプラグインを実行させることができます。Qmonus SDKのCLAソリューションでは、Integration Fabricはスコープ外でユーザが好みのプロダクトを選択できるようにしています。仮にGoogle Cloud Pub/SubをIntegration Fabricにする場合などoperationフェーズの開始時にCloud Pub/Subにtopicやsubscriptionを生成、operationフェーズの終了時にtopicやsubscriptionを削除する必要があります。このような共通的な処理をCLに埋め込むことが可能です。metadataは、CLに持たせたい任意の辞書データを設定できます。metadataは全てのステージで参照でき、加工することが可能です。

以下は、onSpawnHookで利用するFunctionプラグインのサンプルです。Qmonus SDKのホストコンテナにGoogle Cloud Pub/Subの鍵をマウントできるようにして環境変数PUBSUB_SA_PATHを通じて鍵を読み込みCLインスタンスの_secret変数に代入しています。onSpawnHookの第一引数は必ずselfとしてください。selfにはCLインスタンスが代入されます。CLインスタンスの_secret属性は、commissioning時にKubernetes Secretとして自動デプロイされます。このように鍵情報をCLインスタンスがJobとして駆動するPodに伝搬したいケースにおいては必須の機能です。

category: CLA example
name: onSpawn
code: |
  async def onSpawn(self):
      import os
      import aiofiles
      secret = {
          "sa.json": None
      }
      async with aiofiles.open(
          os.environ["PUBSUB_SA_PATH"], mode="r"
      ) as f:
          data = await f.read()
          secret["sa.json"] = base64.b64encode(
              data.encode()
          ).decode("utf-8")
      self._secret = secret


以下は、preOperationComponentで利用するFunctionプラグインのサンプルです。CLインスタンスをIntegration Fabricに接続する処理などを記述しておくと便利です。 このFunctionはKubernetes JobとしてフォークされたPodで動作することになりますが、onSpawnHookで作成されたSecretを読み込んでGoogleのServiceAccountデータやProject名を取得できます。

category: CLA Tutorial
name: connectIntegrationFabric
code: |
  async def connectIntegrationFabric(self):
      import os
      import aiofiles
      from gcloud.aio.pubsub import PublisherClient, SubscriberClient
      from gcloud.aio.pubsub import PubsubMessage, SubscriberMessage

      (SA, PJ) = ("/root/secrets/sa.json", None)
      async with aiofiles.open(SA, mode="r") as f:
          _ = await f.read()
          PJ = MU(json.loads(_)).project_id

      key = "nSId" if "nSId" in self.metadata else "nSSId"
      topicName = f"E2EMD-{self.metadata[key]}"
      topic = f"projects/{PJ}/topics/{topicName}"
      sub = f"projects/{PJ}/subscriptions/{topicName}"

      if not self.metadata:
          self.metadata = {}
      (
          self.metadata["SA"],
          self.metadata["PJ"],
          self.metadata["topic"],
          self.metadata["topicName"],
          self.metadata["sub"]
      ) = (SA, PJ, topic, topicName, sub)

      await self.save()

      # topics作成
      notExists = True
      async with PublisherClient(service_file=SA) as client:
          topics = await client.list_topics(f"projects/{PJ}")
          for i in topics["topics"]:
              if i["name"] == topic:
                  notExists = False
                  break

      if notExists:
          retry = 0
          while retry < 3:
              async with PublisherClient(service_file=SA) as client:
                  try:
                      await client.create_topic(topic)

                      while Runtime.running():
                          topics = await client.list_topics(
                              f"projects/{PJ}"
                          )
                          for i in topics["topics"]:
                              if i["name"] == topic:
                                  break
                          else:
                              continue
                          break
                  except Exception as e:
                      if retry < 3:
                          retry += 1
                          await asyncio.sleep(1)
                          continue

                      logger.error(traceback.format_exc())
                      return

      # subscription作成
      notExists = True
      async with SubscriberClient(service_file=SA) as client:
          try:
              await client.get_subscription(sub)
          except Exception as e:
              logger.info(f"Create because subscription does not exist {sub}")
          else:
              notExists = False

      if notExists:
          retry = 0
          while retry < 3:
              async with SubscriberClient(service_file=SA) as client:
                  try:
                      await client.create_subscription(
                          sub,
                          topic
                      )
                  except Exception as e:
                      if retry < 3:
                          retry += 1
                          await asyncio.sleep(1)
                          continue

                      logger.error(traceback.format_exc())


同様に、postOperationComponentで利用するFunctionプラグインのサンプルです。CLインスタンスをIntegration Fabricから切断する処理などを記述しておくと便利です。

category: CLA example
name: disconnectIntegrationFabric
code: |
  async def disconnectIntegrationFabric(self):
      from gcloud.aio.pubsub import PublisherClient, SubscriberClient
      async with SubscriberClient(
          service_file=self.metadata["SA"]
      ) as client:
          await client.delete_subscription(
              self.metadata["sub"]
          )
      async with PublisherClient(
          service_file=self.metadata["SA"]
      ) as client:
          await client.delete_topic(
              self.metadata["topic"]
          )


最後にClosedLoopTypeプラグインのサンプルです。

category: CLA Tutorial
closedLoopComponentList:
- monitoring
- analysis
- decision
- execution
closedLoopPriority: 10
closedLoopTypeDescription: NSI closedloop
closedLoopTypeName: NSI
manageableEntityList:
- ClosedLoop
onSpawnHook: onSpawn
postOperationComponent: disconnect
preOperationComponent: connect


ClosedLoop

ClosedLoopは、CLインスタンスの管理エンティティです。 CLGによってCLがspawnされるとonSpawnHookが実行されます。次にCLGによってCLのcommissioningがされるとpreOperationComponentが実行され、クローズドループが開始されます。クローズドループはclosedLoopComponentListに従って順番にClosedLoopComponentを実行し続けます。実行中にCLGによってdeactivationされるとクローズドループの処理は停止します。停止状態からCLGによってactivationが行われるとクローズドループの処理が再開します。次にCLGによってdecommissioningが行われるとpostOperationComponentが実行され、クローズドループが削除されpreparation状態に遷移します。最後にCLGによってdestroyが行われるとClosedLoopインスタンスは消滅します。
これらClosedLoopの実行フローは以下の通りです。




ClosedLoopは、以下の状態遷移に従います。CLGによってspawnされるとCLインスタンスはpreparationフェーズに遷移します。このフェーズではCLインスタンスのClosedLoopGoalやClosedLoopPolicyなど各種属性を編集することが可能です。次にCLGによってcommissioningが要求されるとCLインスタンスはcommissiongフェーズに遷移し、適切なkubernetesマニフェストを生成し、自身をkubernetes Jobとして新たなPodで起動します。起動されたPod上のCLインスタンスはoperationフェーズに自律遷移します。operationフェーズではdeactivation/activation操作が可能となります。ClosedLoopComponentのアップグレードなどクローズドループの挙動を更新する際はdeactivationでループを停止(実際にはkubernetes Jobが削除されます)し、プラグインを更新後、activationすることで挙動を変更することが可能です。CLGからdecommissioningが実行されるとCLインスタンスはdecommissioningフェーズに遷移します。decommissioningフェーズとなったCLインスタンスはループ動作を停止し、kubernetesクラスタ上の自身のリソースを削除してpreparationフェーズに遷移します。




ClosedLoopの各種操作は以下を参考にしてください。

ClosedLoopインスタンスを取得する

CLインスタンスが返却されます。

instance = await ClosedLoop.load(closedLoopInstanceUniqueId)


CLインスタンスをリロードする。データベースにあるCLインスタンスの情報と再同期します。

instance = await ClosedLoop.load(closedLoopInstanceUniqueId)
await instance.reload()


CLインスタンスを保存する。各種属性の変更はキーワード引数で指定できます。

instance = await ClosedLoop.load(closedLoopInstanceUniqueId)
await instance.save(
    closedLoopGoal={
        "dLThptPerSlice": {
            "guaThpt": 200000,
            "maxThpt": 400000
        ),
        "uLThptPerSlice": {
            "guaThpt": 20000,
            "maxThpt": 40000
        )
    },
    closedLoopPolicy={
        "uLThptPerSlice": {
            "maxThpt": {
                "upperLimit": 700000,
                "lowerLimit": 40000,
                "increment": 50000,
                "decrement": 50000
            }
        },
        "dLThptPerSlice": {
            "maxThpt": {
                "upperLimit": 700000,
                "lowerLimit": 400000,
                "increment": 50000,
                "decrement": 50000
            }
        }
    }
)


CLインスタンスを削除する。preparationフェーズのみ可能な操作です。

instance = await ClosedLoop.load(closedLoopInstanceUniqueId)
await instance.destroy()


CLインスタンスをdict型に変換する。

instance = await ClosedLoop.load(closedLoopInstanceUniqueId)
logger.info(json.dumps(instance.dictionary, indent=4))


CLインスタンスが稼働するKubernetes名前空間を確認する。

instance = await ClosedLoop.load(closedLoopInstanceUniqueId)
logger.info(instance.NAMESPACE)


CLインスタンスのコンテナイメージを確認する。

instance = await ClosedLoop.load(closedLoopInstanceUniqueId)
logger.info(instance.IMAGE)


CLインスタンスのコンテナログを取得する。
(operationフェーズで且つactive状態の場合のみ取得可能)

instance = await ClosedLoop.load(closedLoopInstanceUniqueId)
logs = await instance.tail(lines=1000, since_seconds=3600)


指定したManagedEntityを管理しているか確認する。

instance = await ClosedLoop.load(closedLoopInstanceUniqueId)
if instance.isTargetEntity(
    "NetworkSlice",
    nSId
):
    logger.info(f"{nsId} is my target")


CLインスタンスのcommissioning時に生成されるkubernetesマニフェストを取得する。

instance = await ClosedLoop.load(closedLoopInstanceUniqueId)
namespace, configmap, job, secret = instance.createManifest()


CLインスタンスのフェーズ遷移を待ち合わせる。

await ClosedLoop.waitfor(
    closedLoopInstanceUniqueId,
    expectedStates=["operation"]
)


CLインスタンスがoperation状態か判定する。

instance = await ClosedLoop.load(closedLoopInstanceUniqueId)
if instance.inOperation():
    logger.info(f"{closedLoopInstanceUniqueId} is operation phase")


CLインスタンスがoperation状態で且つループが稼働中かどうかを判定する。

instance = await ClosedLoop.load(closedLoopInstanceUniqueId)
if instance.inActive():
    logger.info(f"{closedLoopInstanceUniqueId} is active loop")


CLインスタンスのループを停止する。

instance = await ClosedLoop.load(closedLoopInstanceUniqueId)
await instance.deactivation()


CLインスタンスのループを再開する。

instance = await ClosedLoop.load(closedLoopInstanceUniqueId)
await instance.activation()


Note

Qmonus SDKではClosedLoopインスタンスそのもののメソッドでCRUDやactivation/deactivation操作が可能ですが、標準上は後述するCLG経由で操作することになっています。



CLG組込オブジェクト

CLGは、ClosedLoopインスタンスのCRUDやClosed Loop Execution Management Serviceで規定されているループの停止及び再開操作(activation/deactivation)、Closed Loop Usage Statistics Management Serviceで規定されているループの統計情報収集の有効化及び無効化、Closed Loop Reporting Serviceの設定操作を可能にする組込オブジェクトです。



以下は、ScenarioやATOMプラグイン内でCLG組込オブジェクトを操作しているサンプルです。このサンプルでは、3つのネットワークドメインのスライスサブネットを管理するCLインスタンスを生成し、その上位にネットワークスライスを管理するCLインスタンスを生成してスライスサブネットを管理する3つのCLインスタンスも管理対象に設定しています。最後にネットワークスライスのCLインスタンスを管理する上位サービスのCLインスタンスを生成してcommissioningしています。CLインスタンスを個別にcommissioningすることも可能ですがこのように階層化されている場合は最上位のインスタンスをcommissioningすることで全てのCLインスタンスのcommissioningが実行されます。尚、CLGはシングルトンインスタンスで実装されていますのでCLG.getinstance()でインスタンスを取得します。

tnNssi = await CLG.getinstance().preparation(
    closedLoopTypeName="TnNSSI",
    closedLoopTypeDescription="TN-MD NSSI closedloop",
    targetEntityList=[
        {
            "managedEntityType": "NetworkSliceSubnet",
            "managedEntityId": tnNSSId
        }
    ]
)
cnNssi = await CLG.getinstance().preparation(
    closedLoopTypeName="CnNSSI",
    closedLoopTypeDescription="CN-MD NSSI closedloop",
    targetEntityList=[
        {
            "managedEntityType": NetworkSliceSubnet,
            "managedEntityId": cnSSId
        }
    ]
)
dnNssi = await CLG.getinstance().preparation(
    closedLoopTypeName="DnNSSI",
    closedLoopTypeDescription="DN-MD NSSI closedloop",
    targetEntityList=[
        {
            "managedEntityType": NetworkSliceSubnet,
            "managedEntityId": dnSSId
        }
    ]
)
nsi = await CLG.getinstance().preparation(
    closedLoopTypeName="NSI",
    targetEntityList=[
        {
            "managedEntityType": "NetworkSlice",
            "managedEntityId": nSId
        },
        {
            "managedEntityType": "ClosedLoop",
            "managedEntityId": tnNssi.closedLoopInstanceUniqueId
        },
        {
            "managedEntityType": "ClosedLoop",
            "managedEntityId": cnNssi.closedLoopInstanceUniqueId
        },
        {
            "managedEntityType": "ClosedLoop",
            "managedEntityId": dnNssi.closedLoopInstanceUniqueId
        }
    ]
)
await CLG.getinstance().commissioning(
    nsi.closedLoopInstanceUniqueId
)


CLG組込オブジェクトの各種操作は以下を参考にしてください。

CLインスタンスを生成する

CLインスタンスが返却されます。

instance = await CLG.getinstance().preparation(closedLoopTypeName, **kwargs)


CLインスタンスの属性を更新する

返り値はありません。

await CLG.getinstance().override(closedLoopInstanceUniqueId, **kwargs)


CLインスタンスの稼働を開始する

返り値はありません。キーワード引数で_waitfor=Trueを指定した場合は、operationフェーズに遷移するまで制御を戻しません。

await CLG.getinstance().commissioning(closedLoopInstanceUniqueId, **kwargs)


CLインスタンスの稼働を終了する

返り値はありません。キーワード引数で_waitfor=Trueを指定した場合は、preparationフェーズに遷移するまで制御を戻しません。

await CLG.getinstance().decommissioning(closedLoopInstanceUniqueId)


CLインスタンスのループを一旦停止する(Execution MnS)

返り値はありません。クローズドループを停止しますが、operationフェーズを維持します。

await CLG.getinstance().deactivation(closedLoopInstanceUniqueId)


CLインスタンスのループを再開する(Execution MnS)

返り値はありません。クローズドループを再開しますが、operationフェーズを維持します。

await CLG.getinstance().activation(closedLoopInstanceUniqueId)


CLインスタンスを削除する

返り値はありません。CLインスタンスを完全に削除します。

await CLG.getinstance().destroy(closedLoopInstanceUniqueId)


CLインスタンスを取得する

CLインスタンスが返却されます。

instance = await CLG.getinstance().get(closedLoopInstanceUniqueId)


CLインスタンスのリストを取得する

CLインスタンスのリストが返却されます。CLインスタンスの作成順でoffsetとlimitが指定できます。尚、v22.7LTS時点では検索キーはサポートされておらず将来機能追加予定です。

instances = await CLG.getinstance().retrieve(
    offset=0,
    limit=500
)


CLインスタンスの統計情報収集を有効化する(Usage Statistics MnS)

CLインスタンスの統計情報収集を有効化します。有効化した時点からの統計情報となります。デフォルトは無効状態です。

await CLG.getinstance().enableStatistics(closedLoopInstanceUniqueId)


CLインスタンスの統計情報収集を無効化する(Usage Statistics MnS)

CLインスタンスの統計情報収集を無効化します。収集されていた統計情報はクリアされます。

await CLG.getinstance().disableStatistics(closedLoopInstanceUniqueId)


CLインスタンスの統計情報を取得する(Usage Statistics MnS)

CLインスタンスの統計情報を取得します。返却される統計情報はdict型のオブジェクトとなります。

stats = await CLG.getinstance().getStatistics(closedLoopInstanceUniqueId)


Note

統計情報は、以下のスキーマで返却されます。

{
    "type": "object",
    "required": [
        "closedLoopInstanceUniqueId",
        "rpm",
        "operating rate",
        "coordination"
    ],
    "properties": {
        "closedLoopInstanceUniqueId": {
            "type": "string"
        },
        "rpm": {
            "type": "number",
            "description": "クローズドループの回転数/分"
        },
        "operating rate": {
            "type": "number",
            "description": "クローズドループの可動率"
        },
        "coordination": {
            "type": "object",
            "required": [
                "total number of executions",
                "avg processing time"
            ],
            "properties": {
                "total number of executions": {
                    "type": "integer",
                    "description": "コーディネーション実行回数"
                },
                "avg processing time": {
                    "type": "number",
                    "description": "コーディネーション平均処理時間"
                }
            }
        }
    }
}


Closed Loop Reporting Service

Closed Loop Reporting Serviceは、クローズドループに関する様々な情報をレポートするサービスです。Qmonus SDKでは、以下の表に示すレポートを提供しています。 プラグイン開発者が各種ClosedLoopComponent内でカスタムレポートを作成して通知することも可能です。

レポート種別 重症度 デフォルト通知状態 説明
lifeCyclePhase info 有効 ライフサイクルフェーズが移行したことをレポートします。
executionMnS warn 有効 CLループ停止もしくは再開が実行されたことをレポートします。
statisticsMnS notice 有効 統計情報収集が有効化もしくは無効化されたことをレポートします。
systemIncident fatal 有効 操作がシステム的な理由で失敗したことをレポートします。
operatorIncident error 有効 各種ステージのFunctionなどユーザプラグインのスクリプトエラーなどをレポートします。
goalChange notice 有効 CLの目標が変更されたことをレポートします。
policyChange notice 有効 CLのポリシーが変更されたことをレポートします。
stageProgress debug 無効 CLのステージ進行をレポートします。
custom info 有効 ユーザプラグインで自由に発行できるレポートです。重症度のデフォルトはinfoです。

カスタムレポートの発行方法は以下のようなイメージです。CLインスタンスのcustomReportメソッドは、第一引数にレポートメッセージを与える必要があります。キーワード引数としてseverityを指定できます。severityは、debuginfonoticewarnerrorfatalのいづれかを推奨します。

async def sampleStage(self):
    await self.customReport(
        "Hello, Closed Loop Reporting Service",
        severity="debug"
    )


Note

各種レポートはQmonus SDKポータルやGrafanaで監視することができます。また、各種レポートの通知状態は、Qmonus SDKポータルから有効/無効化の設定が可能です。CLAはトラブル解析の難易度が高いため、少なくともデフォルト値での運用をお勧めします。



CLC組込オブジェクト

CLCは複数のCLインスタンス間の調整が必要な際に使用します。CLインスタンスは独立して駆動している関係上、アクションを調整せずに実行してしまうと管理リソースの状態矛盾を発生させたり、競合してしまうことが考えられます。このような問題を解決するため、ある一定の時間ウィンドウを設けて同一リソースに対するアクション要求を蓄積し、最優先の要求にアクション内容をマージしたり、不要なアクションを除去するような調整機能を提供することができます。
CLCに調整を依頼する際の呼び出し方法のサンプルを以下に記載しています。第一引数には、調整を依頼するCLインスタンスの識別子を指定します。第二引数にはFunctionプラグイン名を指定します。このFunctionプラグインが調整部の実行実態となります。第三引数には調整したいデータを指定します。targetEntityListキーワード引数には調整対象となる管理エンティティリストを指定します。省略した場合は、CLインスタンス自身に設定されているtargetEntityList全体が調整対象となります。最後にmeetingDelayTimeキーワード引数には複数のCLインスタンスからの調整要求を待ち合わせる時間(秒単位)を指定します。デフォルトは5秒間です。ここで指定されたmeetingDelayTime後に第二引数で指定されたFunctionプラグインが実行され、調整が開始されます。尚、調整はCLインスタンスが独立駆動しているPod上で実行されます。そのため、同一リソースに対する同一の調整(=Functionプラグイン)を先に開始したCLインスタンスの処理結果が後続のCLインスタンスに調整結果として返却される動作となります。よってmeetingDelayTimeのウィンドウ内にエントリした調整要求は唯一の調整結果として整合する仕組みとなります。

result = await CLC.getinstance().coordination(
    self.closedLoopInstanceUniqueId,
    "preExecutionCoordination",
    decisionDatas,
    targetEntityList=[
        ManagedEntity(
            "NetworkSliceSubnet",
            nSSId
        )
    ],
    meetingDelayTime=30
)


Coordination動作の流れは以下のフロー図を参考にしてください。




Coordinationの実装について

上述した通り、Coordination処理自体はFunctionプラグインで実装します。Coordinationの開始時に呼び出されるFunctionは必ずcoordinationItemsという引数を宣言する必要があります。coordinationItemsは、各CLインスタンスが調整要求をする際に調整対象として渡したデータが格納されている辞書です。 coordinationItemsのkeyはclosedLoopInstanceUniqueIdです。valueはdict型でkeyとしてclosedLoopInstanceUniqueId、coordinationFunctionName、coordinationStatus、coordinationInfo、targetEntityListが存在します。coordinationStatusは、WaitingProcessingProcessedのいづれかですが、Functionプラグインが呼び出された際に参照すると必ずProcessing状態です。coordinationInfoは、CLインスタンスが調整要求をした調整対象データそのものが格納されます。Functionプラグインの調整処理の結果はこのcoordinationInfoを上書きすることで各CLインスタンスの調整結果を決定することができます。尚、coordinationItemsに含まれる調整データは同一のcoordinationFunctionNameによる調整要求であり、且つtargetEntityListに含まれている調整対象エンティティが一部でも重複している調整要求が全て対象となります。

Functionプラグインの最低限のサンプルイメージを以下に記載します。全てのCLインスタンスには調整結果として{"hello": "coordination"}という辞書が返却されます。

async def preExecutionCoordination(coordinationItems):
    for k, v in coordinationItems.items():
        coordinationItems[k]["coordinationInfo"] = {"hello": "coordination"}


Tip

ClosedLoopComponent、onSpawnHook、preOperationComponent、postOperationComponent、Coordinationの単体テストについては処理実態がFunctionプラグインとなるため、Functionのユニットテストで動作確認を行ってください。



CLCが提供するその他のユーティリティメソッド

指定したclosedLoopInstanceUniqueIdを管理している上位のクローズドループインスタンスのclosedLoopInstanceUniqueIdリストを取得する。階層化されたCLインスタンス間でエスカレーションする際に上位インスタンスの取得に役立ちます。

supervisors = await CLC.getinstance().getEscalationClosedLoops(
    closedLoopInstanceUniqueId
)


指定したclosedLoopInstanceUniqueIdを管理している上位のクローズドループインスタンスが管理している他のclosedLoopInstanceUniqueIdリストを取得する。階層化されたCLインスタンス間で同じ親を持つpeer関係のクローズドループインスタンスの取得に役立ちます。

peers = await CLC.getinstance().getPeerClosedLoops(
    closedLoopInstanceUniqueId
)


指定したclosedLoopInstanceUniqueIdリストの中で最も優先度の高いclosedLoopInstanceUniqueIdを取得します。優先度はClosedLoopインスタンスのclosedLoopPriorityフィールドに設定されている数値です。数値が高いほど優先度が高いと判断します。尚、同一優先度のclosedLoopInstanceUniqueIdが複数存在する場合はClosedLoopインスタンスの生成時刻が最も古いものを最優先として返却します。

winner = await CLC.getinstance().priorityWinner(
    closedLoopInstanceUniqueIds
)



チュートリアル

Qmonus SDKのCLA機能のチュートリアルを開始します。例題として3つの管理ドメインからのPM(Performance Monitoring)/FM(Fault Monitoring)メッセージをインテグレーションファブリックを経由して受信するNetworkSliceSubnetのクローズドループとそれらを束ねるNetworkSliceのクローズドループを階層関係で稼働させるケースを取り上げます。インテグレーションファブリックには、Google Cloud Pub/Subを利用します。



クローズドループ共通機能の作成

チュートリアルで扱うクローズドループは全てインテグレーションファブリックに接続してメッセージングするため、Cloud Pub/Subへの接続、切断に伴う処理は共通化できます。これらの処理は以下の図に示すようにCLインスタンスの生成時とクローズドループの開始、終了時の処理に集約します。

理解しておくべき点としてCLインスタンスの生成は、Qmonus SDKが稼働しているPod上で行われるため、onSpawnHookはQmonus SDKのランタイム環境でマウントされているSecret情報を扱うことができます。一方、CLインスタンスのcommissioningが実行されるとCLインスタンスはkubernetes Jobによって生成される別のPodで駆動します。そのため、このPodからQmonus SDK本体の環境情報を得ることができません。onSpawnHookはPodを跨いでSecret情報を伝搬するためにCLインスタンスの_secretフィールドに読み込んだSecretを設定し、実行Podが変わってもその情報を得るようにする仕組みです。よってpreOperationComponent、postOperationComponentは、Jobで生成されたPod側で動作しますがonSpawnHookで伝搬されたSecretを読み込むことができます。



onSpawnHookの作成

onSpawnHookをFunctionプラグインで作成します。第一引数にはCLインスタンスが与えられるため、必ずselfを記述するようにしてください。ここではQmonus LabでPUBSUB_SA_PATHという環境変数でService Accountファイルをマウントしている前提です。Service Accountファイルを開いて読み込んだらsecret辞書に格納してCLインスタンスの_secretフィールドに設定します。CLインスタンスの_secretフィールドはcommissioning時にkubernetesのSecretリソースとしてデプロイされ、Job起動されたCLインスタンスPodにマウントされます。

category: CLA Tutorial
name: onSpawn
code: |
  async def onSpawn(self):
      import os
      import aiofiles

      secret = {
          "sa.json": None
      }
      async with aiofiles.open(
          os.environ["PUBSUB_SA_PATH"], mode="r"
      ) as f:
          data = await f.read()
          secret["sa.json"] = base64.b64encode(
              data.encode()
          ).decode("utf-8")
      self._secret = secret


preOperationComponentの作成

preOperationComponentを作成します。preOperationComponentの処理実装自体はFunctionプラグインとして作成します。ここではconnectIntegrationFabricという関数名で作成します。第一引数にはCLインスタンスが与えられるため、必ずselfを記述するようにしてください。
最初にcommissioningによって生成されたkubernetes Secretを読み込みます。マウントポイントは、/root/secrets配下です。 また、topic名は、/projects/{project}/topics/E2EMD-{nSSId}、同様にsubscription名は、/projects/{project}/subscriptions/E2EMD-{nSSId}とします。
Secretから読み込んだServiceAccount、projectIdとtopic名、subscription名についてはpostOperationComponentでも同様に必要となるので冗長処理排除の観点からCLインスタンスが持つmetadataに保存しておきます。

category: CLA Tutorial
name: connectIntegrationFabric
code: |
  async def connectIntegrationFabric(self):
      import os
      import aiofiles
      from gcloud.aio.pubsub import PublisherClient, SubscriberClient
      from gcloud.aio.pubsub import PubsubMessage, SubscriberMessage

      (SA, PJ) = ("/root/secrets/sa.json", None)
      async with aiofiles.open(SA, mode="r") as f:
          _ = await f.read()
          PJ = MU(json.loads(_)).project_id

      key = "nSId" if "nSId" in self.metadata else "nSSId"
      topicName = f"E2EMD-{self.metadata[key]}"
      topic = f"projects/{PJ}/topics/{topicName}"
      sub = f"projects/{PJ}/subscriptions/{topicName}"

      if not self.metadata:
          self.metadata = {}
      (
          self.metadata["SA"],
          self.metadata["PJ"],
          self.metadata["topic"],
          self.metadata["topicName"],
          self.metadata["sub"]
      ) = (SA, PJ, topic, topicName, sub)

      await self.save()

      # topics作成
      notExists = True
      async with PublisherClient(service_file=SA) as client:
          topics = await client.list_topics(f"projects/{PJ}")
          for i in topics["topics"]:
              if i["name"] == topic:
                  notExists = False
                  break

      if notExists:
          retry = 0
          while retry < 3:
              async with PublisherClient(service_file=SA) as client:
                  try:
                      await client.create_topic(topic)

                      while Runtime.running():
                          topics = await client.list_topics(
                              f"projects/{PJ}"
                          )
                          for i in topics["topics"]:
                              if i["name"] == topic:
                                  break
                          else:
                              continue
                          break
                  except Exception as e:
                      if retry < 3:
                          retry += 1
                          await asyncio.sleep(1)
                          continue

                      logger.error(traceback.format_exc())
                      return

      # subscription作成
      notExists = True
      async with SubscriberClient(service_file=SA) as client:
          try:
              await client.get_subscription(sub)
          except Exception as e:
              logger.info(f"Create because subscription does not exist {sub}")
          else:
              notExists = False

      if notExists:
          retry = 0
          while retry < 3:
              async with SubscriberClient(service_file=SA) as client:
                  try:
                      await client.create_subscription(
                          sub,
                          topic
                      )
                  except Exception as e:
                      if retry < 3:
                          retry += 1
                          await asyncio.sleep(1)
                          continue

                      logger.error(traceback.format_exc())


Functionプラグインを作成したらClosedLoopComponentにリンクします。これでClosedLoopからClosedLoopComponentとして利用可能な状態になります。

category: CLA Tutorial
closedLoopComponentDescription: Connect Integration Fabric
closedLoopComponentName: connect
producedManagementCapabilitiesList:
  - connectIntegrationFabric


postOperationComponentの作成

postOperationComponentを作成します。metadataからSAや削除対象のtopic名、subscription名を取得してCloud Pub/Subのリソースを削除します。

category: CLA Tutorial
name: disconnectIntegrationFabric
code: |
  async def disconnectIntegrationFabric(self):
      from gcloud.aio.pubsub import PublisherClient, SubscriberClient

      async with SubscriberClient(
          service_file=self.metadata["SA"]
      ) as client:
          await client.delete_subscription(
              self.metadata["sub"]
          )

      async with PublisherClient(
          service_file=self.metadata["SA"]
      ) as client:
          await client.delete_topic(
              self.metadata["topic"]
          )


Functionプラグインを作成したらClosedLoopComponentにリンクします。

category: CLA Tutorial
closedLoopComponentDescription: Disconnect Integration Fabric
closedLoopComponentName: disconnect
producedManagementCapabilitiesList:
  - disconnectIntegrationFabric


監視データの仕様について

ClosedLoopComponentの作成の前に、チュートリアルで扱う監視データフォーマットを規定しておきます。簡易化のため、FMデータのみを扱うこととし、メッセージフォーマットは、3GPP TS 28.532に規定されているnotifyThresholdCrossingを使用します。本来E2EOドメインでは様々なドメイン(3GPP準拠、非準拠問わず)との接続を考慮する必要がありますがそれらは本ドキュメントの関心事ではありません。

notifyThresholdCrossingのメッセージフォーマットは以下のjsonスキーマに従うものとします。

{
    "type": "object",
    "required": [
        "href",
        "notificationId",
        "notificationType",
        "eventTime",
        "systemDN",
        "observedPerfMetricName",
        "observedPerfMetricValue",
        "observedPerfMetricDirection",
        "thresholdValue",
        "hysteresis"
    ],
    "properties": {
        "href": {
            "type": "string"
        },
        "notificationId": {
            "type": "integer"
        },
        "notificationType": {
            "type": "string"
        },
        "eventTime": {
            "type": "string"
        },
        "systemDN": {
            "type": "string"
        },
        "observedPerfMetricName": {
            "type": "string"
        },
        "observedPerfMetricValue": {
            "type": "string"
        },
        "thresholdValue": {
            "type": "integer"
        },
        "hysteresis": {
            "type": "integer"
        },
        "monitorGranularityPeriod": {
            "type": "integer"
        },
        "additionalText": {
            "type": "string"
        }
    }


notifyThresholdCrossingのメッセージ例は以下の通りです。observedPerfMetricNameはbandwidthUtilizationUpとbandwidthUtilizationDownの2値とし、bandwidthUtilizationUpは上り帯域、bandwidthUtilizationDownは下り帯域を意味します。observedPerfMetricDirectionはUPとDOWNの2値とし、UPは上限閾値超過、DOWNは下限閾値超過を意味します。
つまり、observedPerfMetricNameがbandwidthUtilizationUp、且つobservedPerfMetricDirectionがUPで通知された場合は、上り方向の帯域使用率の上限閾値超過と解釈します。

{
    "href": "http://dummy.ntt.com/productInventoryManagement/v1/product",
    "notificationId": 1,
    "notificationType": "notifyThresholdCrossing",
    "eventTime": "2022-02-10T01:58:37.000+09:00",
    "systemDN": "DC=dummy.ntt.com",
    "observedPerfMetricName": "bandwidthUtilizationUp",
    "observedPerfMetricValue": 90,
    "observedPerfMetricDirection": "UP",
    "thresholdValue": 80,
    "hysteresis": 0,
    "monitorGranularityPeriod": 60
}


ClosedLoopの目標

ClosedLoopの目標とは、維持すべき管理対象リソースの仕様を記述した宣言オブジェクトです。実態としてはCLインスタンスのclosedLoopGoalフィールドに定義される辞書オブジェクトとなります。目標は宣言的であるべきです。ETSI ZSMでは、ネットワークスライスやネットワークスライスサブネットの仕様は、3GPP TS 28.541で規定されるNRM(Network Resource Model)に基づき、ServiceProfileやSliceSubnetProfileで管理されます。本チュートリアルでは、これらのプロファイルオブジェクトを辞書化した表現を目標として使用することとします。

NSSIの目標

NSSIの目標は以下のスキーマで管理します。NRMには、latencyやcoverageArea、PktSizeなどのプロファイル属性が定義されていますが、本チュートリアルでは使用しないため、最低限のスループット属性のみに絞って実装します。本来トップのキー名の後半部にSubnetが付きますが、簡易化のためNSIと同じ名称に統一している点をご了承ください。

{
    "type": "object",
    "requred": [
    ],
    "properties": {
        "dLThptPerSlice": {
            "type": "object",
            "required": [
                "guaThpt",
                "maxThpt"
            ],
            "properties": {
                "guaThpt": {
                    "type": "number"
                },
                "maxThpt": {
                    "type": "number"
                }
            }
        }
        "uLThptPerSlice": {
            "type": "object",
            "required": [
                "guaThpt",
                "maxThpt"
            ],
            "properties": {
                "guaThpt": {
                    "type": "number"
                },
                "maxThpt": {
                    "type": "number"
                }
            }
        }
    }
}


NSIの目標

NSIの目標は以下のスキーマで管理します。NSSI同様にNRMには、多くのプロファイル属性が定義されていますが、本チュートリアルでは使用しないため、最低限のスループット属性のみに絞って実装します。

{
    "type": "object",
    "requred": [
    ],
    "properties": {
        "dLThptPerSlice": {
            "type": "object",
            "required": [
                "guaThpt",
                "maxThpt"
            ],
            "properties": {
                "guaThpt": {
                    "type": "number"
                },
                "maxThpt": {
                    "type": "number"
                }
            }
        }
        "uLThptPerSlice": {
            "type": "object",
            "required": [
                "guaThpt",
                "maxThpt"
            ],
            "properties": {
                "guaThpt": {
                    "type": "number"
                },
                "maxThpt": {
                    "type": "number"
                }
            }
        }
    }
}


ClosedLoopのポリシー

ClosedLoopのポリシーとは、目標が維持できなくなった場合に遵守すべき行動ポリシーです。実態としてはCLインスタンスのclosedLoopPolicyフィールドに定義される辞書オブジェクトとなります。ポリシーは宣言的であるべきです。ETSI ZSMでは、ポリシーの表現は規定されていません。本チュートリアルでは使用帯域の上限閾値超過、下限閾値超過の検出をトリガーにスライスの帯域設定を追従させるアクションを自動化することが可能なよう独自のポリシー表現を定義します。またチュートリアルではNSSIのCLにはポリシーを設定せず、NSIのCLに設定することとします。これによってNSSIのDecisionステージでは自身の目標であるプロファイルの差分まで検出するものの行動指針が示されていないため、上位ループであるNSIのCLインスタンスにエスカレーションすることになります。NSIのCLインスタンスのDecisionステージでは以下のポリシーを適用し、スライス全体のmaxThptの変更アクションを決定することになります。

{
    "uLThptPerSlice": {
        "maxThpt": {
            "upperLimit": 700000,
            "lowerLimit": 40000,
            "increment": 50000,
            "decrement": 50000
        }
    },
    "dLThptPerSlice": {
        "maxThpt": {
            "upperLimit": 700000,
            "lowerLimit": 400000,
            "increment": 50000,
            "decrement": 50000
        }
    }
}


Note

上記のポリシーの解釈方法として、例えば上り帯域の上限閾値を超過した場合、upperLimitに到達するまでincrementに規定された帯域分増速可能する。逆に上り帯域の加減閾値を超過した場合、lowerLimitに到達するまでdecrementに規定された帯域分減速可能とするという意味解釈をするものとします。

ClosedLoopComponentの作成

ClosedLoopComponentを作成します。本チュートリアルでは、NSI、NSSIのクローズドループを作成しますが、簡易化のためClosedLoopComponentとして処理内容が共通化できるものについては集約して作成することとします。

Monitoringステージ

MonitoringステージのClosedLoopComponentを作成します。まずFunctionプラグインでMonitoringの実装を行います。尚、NSI/NSSI共にCloud Pub/Subからsubscribeしたデータをリスト化して返却するのみの処理であることから共用関数として作成します。
Functionプラグインの実装例は以下の通りです。

category: CLA Tutorial
name: monitoringStage
code: |
  async def monitoringStage(self, changedGoalStatement):
      from gcloud.aio.pubsub import SubscriberClient, SubscriberMessage
      import backoff

      class SubscriberClientWithBackoff(SubscriberClient):
          @backoff.on_exception(
              backoff.expo,
              aiohttp.ClientResponseError,
              max_tries=5,
              jitter=backoff.full_jitter
          )
          async def pull(self, *args, **kwargs):
              return await super().pull(*args, **kwargs)

      messages = []
      try:
          async with SubscriberClientWithBackoff(
              service_file=self.metadata["SA"]
          ) as client:
              _: List[SubscriberMessage] = await client.pull(
                  self.metadata["sub"],
                  max_messages=128
              )
              if _:
                  await client.acknowledge(
                      self.metadata["sub"],
                      [i.ack_id for i in _]
                  )
                  messages.extend(_)
      except Exception:
          logger.info("backoff timedout, continue")

      if changedGoalStatement:
          """Goalが変化した場合は滞留イベントの信憑性が低いと判断し、廃棄する
          """
          return []

      [
          logger.info(f"received raw data: {i.data}") for i in messages
      ]
      return [json.loads(i.data) for i in messages]


Note

第一引数のselfは必須でCLインスタンスが代入されてきます。第二引数のchangedGoalStatementに注意してください。原則ClosedLoopComponentのinputDataListに指定可能な変数名は、CLインスタンスの名前空間にある情報です。名前空間の情報とはClosedLoopComponentのoutputDataListに定義されたClosedLoopComponentが返却するデータ群です。ここで定義しているMonitoringステージの処理は最初に動作するClosedLoopComponentであり、名前空間には変数が存在しないはずです。しかしながらchangedGoalStatementは特別な変数でCLインスタンスがoperationフェーズでループする際に自動的にclosedLoopGoalをチェックして前回のループ時から目標が変化していればchangedGoalStatement=True、変化していなければchangedGoalStatement=Falseという変数を名前空間に構築します。Monitoringステージでは監視データをsubscribeしていますが、CLインスタンスのアクションやオペレータによってCLとしての目標が変化した場合にその間に監視していたデータの信憑性が低いことからchangedGoalStatement=Trueの場合はsubscribeしたデータを破棄して次の周期の監視データが新たな目標下での適正なデータとして扱うような処理を入れておくことをお勧めします。


Functionプラグインを作成したらClosedLoopComponentにリンクします。

category: CLA Tutorial
closedLoopComponentDescription: Monitoring Stage
closedLoopComponentName: monitoring
inputDataList:
  - changedGoalStatement
outputDataList:
  - monitoringDatas
producedManagementCapabilitiesList:
  - monitoringStage


Analysisステージ

AnalysisステージのClosedLoopComponentを作成します。まずFunctionプラグインでAnalysisの実装を行います。AnalysisステージはMonitoringステージで作成されたmonitoringDatasを引数として受け取ります。monitoringDatasが空の場合やCLの目標が設定されていない場合は空データを返却します。
monitoringDatasが存在する場合は、メッセージを解析し、CL目標とのギャップを抽出します。ギャップ情報はMonitoringDatasのエントリにclosedLoopGoalGapというフィールドで設定し、これらのリストをanalysisDatasという形で返却します。 notifyThresholdCrossing内部関数は、closedLoopGoalからギャップ部を抜き出すだけの処理ですが、NSI/NSSIで共用できるようにキー名のハンドリングを追加で行っています。
尚、本チュートリアルは簡易化のためにこのような実装としていますのでメッセージングの設計は柔軟に行っていただいて構いません。

category: CLA Tutorial
name: analysisStage
code: |
  async def analysisStage(self, monitoringDatas):
      def notifyThresholdCrossing(o, closedLoopGoal):
          prefix = "u" if o.observedPerfMetricName.endswith("Up") else "d"
          key = f"{prefix}LThptPerSlice"
          if key in closedLoopGoal:
              return {
                  key: closedLoopGoal[key]
              }
          return {}

      analysisDatas = []
      if not monitoringDatas or not self.closedLoopGoal:
          return analysisDatas

      for i in monitoringDatas:
          o = MU(i)
          closedLoopGoalGap = notifyThresholdCrossing(
              o,
              self.closedLoopGoal
          )
          if not closedLoopGoalGap:
              continue

          o.closedLoopGoalGap = closedLoopGoalGap
          analysisDatas.append(o.dictionary)

      return analysisDatas


Functionプラグインを作成したらClosedLoopComponentにリンクします。

category: CLA Tutorial
closedLoopComponentDescription: Analysis Stage
closedLoopComponentName: analysis
inputDataList:
  - monitoringDatas
outputDataList:
  - analysisDatas
producedManagementCapabilitiesList:
  - analysisStage


Decisionステージ

DecisionステージのClosedLoopComponentを作成します。まずFunctionプラグインでDecisionの実装を行います。DecisionステージはAnalysisステージで作成されたanalysisDatasを引数として受け取ります。analysisDatasが空の場合は空データを返却します。closedLoopPolicyが設定されていない場合は、analysisDatasを加工せず、decisionDatasとして返却します。closedLoopPolicyが設定されている場合は、analysisDatasにポリシーを適用します。

category: CLA Tutorial
name: decisionStage
code: |
  async def decisionStage(self, analysisDatas):
      async def notifyThresholdCrossing(o, closedLoopPolicy):
          def applyPolicy(oldValue, policy, action):
              logger.info(f"Apply policy {oldValue} {action} {policy}")
              if action == "increment":
                  newValue = oldValue + policy[action]
                  if "upperLimit" in policy:
                      if newValue <= policy["upperLimit"]:
                          return newValue
              else:
                  newValue = oldValue - policy[action]
                  if "lowerLimit" in policy:
                      if newValue >= policy["lowerLimit"]:
                          return newValue
              return oldValue

          prefix = "u" if o.observedPerfMetricDirection == "UP" else "d"
          action = None

          if o.observedPerfMetricDirection == "UP":
              action = "increment"
          else:
              action = "decrement"

          applied = False
          for k, v in o.closedLoopGoalGap.items():
              policy = closedLoopPolicy.get(k, None)
              if not policy:
                  continue

              for name, value in o.closedLoopGoalGap[k].items():
                  if name in policy:
                      newValue = applyPolicy(
                          value,
                          policy[name],
                          action
                      )
                      if newValue != o.closedLoopGoalGap[k][name]:
                          o.closedLoopGoalGap[k][name] = newValue
                          applied = True
          if applied:
              return o.closedLoopGoalGap.dictionary

      decisionDatas = []
      if not analysisDatas:
          return decisionDatas

      if not self.closedLoopPolicy:
          return analysisDatas

      for i in analysisDatas:
          o = MU(i)

          closedLoopGoalGap = await notifyThresholdCrossing(
              o,
              self.closedLoopPolicy
          )
          if not closedLoopGoalGap:
              continue

          decisionDatas.append(closedLoopGoalGap)

      return decisionDatas


Functionプラグインを作成したらClosedLoopComponentにリンクします。

category: CLA Tutorial
closedLoopComponentDescription: Decision Stage
closedLoopComponentName: decision
inputDataList:
  - analysisDatas
outputDataList:
  - decisionDatas
producedManagementCapabilitiesList:
  - decisionStage


Executionステージ

ExecutionステージのClosedLoopComponentを作成します。尚、ここまでのステージはNSI/NSSI双方のループで共用可能な実装でしたが、NSIのExecutionステージはMonitoringステージの次に実行されるため、共用できません。inputDataListが異なるためです。まずNSSIのExecutionステージから作成していきます。

NSSIのExecutionステージ

FunctionプラグインでExecutionの実装を行います。ExecutionステージはDecisionステージで作成されたdecisionDatasを引数として受け取ります。decisionDatasが空の場合は何もせず処理を抜けます。closedLoopPolicyが設定されていない場合は、上位のクローズドループにエスカレーションします。これはNSSIのクローズドループとして本コンポーネントが利用された場合の処理となります。考慮すべきはNSSIのクローズドループは同一のNSIのクローズドループ配下で駆動している点です。NSSIのそれぞれのループがバラバラにエスカレーションすると早い者勝ちでアクションが適用されてしまう可能性があり、NSSMF側の監視周期のバラつきによって矛盾する制御結果を招く恐れもあります。このようにクローズドループ間の調整が必要なケースではCLCによる調整が有効となります。本チュートリアルにおいては、NSSIのCLインスタンスからNSIのCLインスタンスへのエスカレーションを調整する例を実装してみます。

category: CLA Tutorial
name: executionStage
code: |
  async def executionStage(self, decisionDatas):
      from gcloud.aio.pubsub import PublisherClient, PubsubMessage

      if not decisionDatas:
          return

      result = await CLC.getinstance().coordination(
          self.closedLoopInstanceUniqueId,
          "preExecutionCoordination",
          decisionDatas,
          meetingDelayTime=15
      )
      if not result:
          return

      logger.info("Escalates because the policy is not set")

      escalationClosedLoops = await CLC.getinstance().getEscalationClosedLoops(
          self.closedLoopInstanceUniqueId
      )
      if escalationClosedLoops:
          escalatioMessages = [
              PubsubMessage(
                  json.dumps(result),
                  attribute="value"
              )
          ]
          async with PublisherClient(
              service_file=self.metadata["SA"]
          ) as client:
              for i in escalationClosedLoops:
                  supervisor = await ClosedLoop.load(i)

                  escalationTopic = client.topic_path(
                      self.metadata["PJ"],
                      supervisor.metadata["topicName"]
                  )
                  await client.publish(
                      escalationTopic,
                      escalatioMessages
                  )


Functionプラグインを作成したらClosedLoopComponentにリンクします。

category: CLA Tutorial
closedLoopComponentDescription: Execution Stage
closedLoopComponentName: execution
inputDataList:
  - decisionDatas
producedManagementCapabilitiesList:
  - executionStage


Coordination機能の実装

次にエスカレーションメッセージの競合を回避してアクション内容をマージするためのcoordination関数を作成します。Functionプラグインで以下の関数を定義します。coordinationItemsで与えられた調整アイテムの中から最も優先度の高いクローズドループを選択して、そのクローズドループの調整内容に他のクローズドループのアクション内容をマージしています。ここではチュートリアルの単純化のためこのようなシンプルな実装となっている点にご理解ください。

category: CLA Tutorial
name: preExecutionCoordination
code: |
  async def preExecutionCoordination(coordinationItems):
      if not coordinationItems:
          return

      """優先度の勝者を決定する
      """
      winner = await CLC.getinstance().priorityWinner(
          list(coordinationItems.keys())
      )

      gap = {}
      [
          gap.update(i) for i in coordinationItems[winner]["coordinationInfo"]
      ]

      for k, v in coordinationItems.items():
          if k == winner:
              continue

          [
              gap.update(i) for i in coordinationItems[k]["coordinationInfo"]
          ]
          """優先度の敗者の結果を消去する
          """
          coordinationItems[k]["coordinationInfo"] = None

      """優先度の勝者の結果を上書きする
      """
      coordinationItems[winner]["coordinationInfo"] = gap


NSIのExecutionステージ

FunctionプラグインでExecutionの実装を行います。ExecutionステージはMonitoringステージで作成されたmonitoringDatasを引数として受け取ります。monitoringDatasが空の場合は何もせず処理を抜けます。NSIのExecutionステージでは既にアクション内容がマージされた要求がmonitoringDatasに入ってくるため、このデータを元にNSMFに対してスライスの変更要求を送信する想定としています。チュートリアルではNSMF相当の実装にはフォーカスしませんので受信したデータをログ出力するのみと実装とします。

category: CLA Tutorial
name: executionStageForNSI
code: |
  async def executionStageForNSI(self, monitoringDatas):
      if not monitoringDatas:
          return

      logger.info(f"Send slice change request to NSMF...\n{monitoringDatas}")


Functionプラグインを作成したらClosedLoopComponentにリンクします。

category: CLA Tutorial
closedLoopComponentDescription: Execution Stage for NSI
closedLoopComponentName: executionForNSI
inputDataList:
  - monitoringDatas
producedManagementCapabilitiesList:
  - executionStageForNSI


ClosedLoopTypeの作成

ClosedLoopTypeは、クローズドループをテンプレート化することができます。これまで作成してきた各種プラグインが揃っていればプログラムで動的にClosedLoopを組み立てることが可能ですが、通常は今回のケースのようにクローズドループは特定の種別に分類可能であり、パイプラインは同じものになることが大半であると想定されます。ここでは、NSSIとNSI用の2種類のClosedLoopTypeを作成します。

NSSI用のClosedLoopType作成

NSSIでは、4つのステージを定義します。また管理可能なエンティティとしてNetworkSlice、NetworkSliceSubnetを指定します。ZSMのモデル的にはNetworkSliceSubnetのみが妥当ですが、本チュートリアルではNSIのクローズドループに対してNSSIのクローズドループ間で調整したエスカレーションメッセージを作る必要があり、その調整の際にクローズドループ間で共通の管理エンティティが定義されている必要があるため、ここでNetworkSliceについても管理可能と定義しています。また、onSpawnHookやpreOperationComponent、postOperationComponentについても事前設定しておきます。closedLoopPriorityについてはチュートリアル的には優劣つける必要性がないため、一律30という数値を設定しています。

category: CLA Tutorial
closedLoopComponentList:
  - monitoring
  - analysis
  - decision
  - execution
closedLoopGoal: {}
closedLoopPolicy: {}
closedLoopPriority: 30
closedLoopTypeDescription: NSSI closedloop
closedLoopTypeName: NSSI
manageableEntityList:
  - NetworkSlice
  - NetworkSliceSubnet
metadata: {}
onSpawnHook: onSpawn
postOperationComponent: disconnect
preOperationComponent: connect


NSI用のClosedLoopType作成

NSIでは、2つのステージを定義します。また管理可能なエンティティとしてClosedLoop、NetworkSliceを指定します。NSIのクローズドループはNSSIのクローズドループを管理エンティティとして関連付けます。これによってcommissioningやdecommissioningはNSIクローズドループからNSSIクローズドループに伝搬します。

category: CLA Tutorial
closedLoopComponentList:
  - monitoring
  - executionForNSI
closedLoopPriority: 30
closedLoopTypeDescription: NSI closedloop
closedLoopTypeName: NSI
manageableEntityList:
  - ClosedLoop
  - NetworkSlice
onSpawnHook: onSpawn
postOperationComponent: disconnect
preOperationComponent: connect


動作確認

いよいよ動作確認をしていきます。最初にクローズドループを生成してcommissioningで稼働を開始する必要があります。以下のFunctionプラグインは動作確認のため、クローズドループを生成して開始するスクリプトです。

category: CLA Tutorial
name: testPreparation
code: |
  async def testPreparation():
      (nSId, tnNSSId, cnNSSId, dnNSSId) = (
          uuid.uuid1().hex,
          uuid.uuid1().hex,
          uuid.uuid1().hex,
          uuid.uuid1().hex
      )

      nSSIGoal = {
          "dLThptPerSlice": {
              "guaThpt": 200000,
              "maxThpt": 400000
          },
          "uLThptPerSlice": {
              "guaThpt": 20000,
              "maxThpt": 40000
          }
      }

      nSIGoal = {
          "dLThptPerSlice": {
              "guaThpt": 200000,
              "maxThpt": 400000
          },
          "uLThptPerSlice": {
              "guaThpt": 20000,
              "maxThpt": 40000
          }
      }

      Policy = {
          "uLThptPerSlice": {
              "maxThpt": {
                  "upperLimit": 700000,
                  "lowerLimit": 40000,
                  "increment": 50000,
                  "decrement": 50000
              }
          },
          "dLThptPerSlice": {
              "maxThpt": {
                  "upperLimit": 700000,
                  "lowerLimit": 400000,
                  "increment": 50000,
                  "decrement": 50000
              }
          }
      }

      tnNssi = await CLG.getinstance().preparation(
          closedLoopTypeName="NSSI",
          closedLoopTypeDescription="TN-MD NSSI closedloop",
          targetEntityList=[
              {
                  "managedEntityType": "NetworkSlice",
                  "managedEntityId": nSId
              },
              {
                  "managedEntityType": "NetworkSliceSubnet",
                  "managedEntityId": tnNSSId
              }
          ],
          closedLoopGoal=nSSIGoal,
          closedLoopPolicy=Policy,
          metadata={
              "nSSId": tnNSSId
          }
      )
      cnNssi = await CLG.getinstance().preparation(
          closedLoopTypeName="NSSI",
          closedLoopTypeDescription="CN-MD NSSI closedloop",
          targetEntityList=[
              {
                  "managedEntityType": "NetworkSlice",
                  "managedEntityId": nSId
              },
              {
                  "managedEntityType": "NetworkSliceSubnet",
                  "managedEntityId": cnNSSId
              }
          ],
          closedLoopGoal=nSSIGoal,
          closedLoopPolicy=Policy,
          metadata={
              "nSSId": cnNSSId
          }
      )
      dnNssi = await CLG.getinstance().preparation(
          closedLoopTypeName="NSSI",
          closedLoopTypeDescription="DN-MD NSSI closedloop",
          targetEntityList=[
              {
                  "managedEntityType": "NetworkSlice",
                  "managedEntityId": nSId
              },
              {
                  "managedEntityType": "NetworkSliceSubnet",
                  "managedEntityId": dnNSSId
              }
          ],
          closedLoopGoal=nSSIGoal,
          closedLoopPolicy=Policy,
          metadata={
              "nSSId": dnNSSId
          }
      )
      nsi = await CLG.getinstance().preparation(
          closedLoopTypeName="NSI",
          targetEntityList=[
              {
                  "managedEntityType": "ClosedLoop",
                  "managedEntityId": tnNssi.closedLoopInstanceUniqueId
              },
              {
                  "managedEntityType": "ClosedLoop",
                  "managedEntityId": cnNssi.closedLoopInstanceUniqueId
              },
              {
                  "managedEntityType": "ClosedLoop",
                  "managedEntityId": dnNssi.closedLoopInstanceUniqueId
              },
              {
                  "managedEntityType": "NetworkSlice",
                  "managedEntityId": nSId
              }
          ],
          closedLoopGoal=nSIGoal,
          metadata={
              "nSId": nSId
          }
      )
      await CLG.getinstance().commissioning(
          nsi.closedLoopInstanceUniqueId
      )


上記のFunctionプラグインを作成したら、REPLを用いて実行してみてください。

await context.testPreparation()


Qmonus SDKポータルのクローズドループ操作画面でインスタンスの稼働状態やPodのログが参照できます。
次にCloud Pub/Subに対してFMデータを送信し、クローズドループによる自律制御が行われることを確認します。そのためにCloud Pub/Subをメッセージ操作を簡単に行うためのModuleプラグインを作成しておきます。

category: CLA Tutorial
name: pubsub
code: |
  import os
  import random
  import aiofiles
  from gcloud.aio.pubsub import PublisherClient, SubscriberClient, PubsubMessage

  async def getEnvironment():
      (SA, PROJECT) = (os.environ["PUBSUB_SA_PATH"], None)
      async with aiofiles.open(SA, mode="r") as f:
          _ = await f.read()
          PROJECT = MU(json.loads(_)).project_id
      return SA, PROJECT


  async def listTopics():
      SA, PROJECT = await context.pubsub.getEnvironment()
      async with PublisherClient(service_file=SA) as client:
          return await client.list_topics(f"projects/{PROJECT}")


  async def deleteTopic(topic):
      SA, PROJECT = await context.pubsub.getEnvironment()
      async with PublisherClient(service_file=SA) as client:
          return await client.delete_topic(f"projects/{PROJECT}/topics/{topic}")


  async def publish(topic, data):
      SA, PROJECT = await context.pubsub.getEnvironment()

      async with PublisherClient(service_file=SA) as client:
          topic = client.topic_path(PROJECT, topic)
          messages = []
          if type(data) == list:
              messages = [
                  PubsubMessage(json.dumps(i), attribute="value") for i in data
              ]
          else:
              messages = [
                  PubsubMessage(json.dumps(data), attribute="value")
              ]
          return await client.publish(topic, messages)


  async def listSubscriptions():
      SA, PROJECT = await context.pubsub.getEnvironment()
      async with SubscriberClient(service_file=SA) as client:
          return await client.list_subscriptions(f"projects/{PROJECT}")


  async def deleteSubscription(subscription):
      SA, PROJECT = await context.pubsub.getEnvironment()
      async with SubscriberClient(service_file=SA) as client:
          return await client.delete_subscription(
              f"projects/{PROJECT}/subscriptions/{subscription}"
          )


  async def getSubscription(subscription):
      SA, PROJECT = await context.pubsub.getEnvironment()
      async with SubscriberClient(service_file=SA) as client:
          return await client.get_subscription(
              f"projects/{PROJECT}/subscriptions/{subscription}"
          )

  async def test(topic1, topic2):
      notifyThresholdCrossing = {
          "href": "http://dummy.ntt.com/productInventoryManagement/v1/product",
          "notificationId": 1,
          "notificationType": "notifyThresholdCrossing",
          "eventTime": "2022-06-14T01:58:37.000+09:00",
          "systemDN": "DC=dummy.ntt.com",
          "observedPerfMetricName": "bandwidthUtilizationUp",
          "observedPerfMetricValue": 90,
          "observedPerfMetricDirection": "UP",
          "thresholdValue": 80,
          "hysteresis": 0,
          "monitorGranularityPeriod": 60
      }
      await context.pubsub.publish(topic1, notifyThresholdCrossing)

      notifyThresholdCrossing = {
          "href": "http://dummy.ntt.com/productInventoryManagement/v1/product",
          "notificationId": 1,
          "notificationType": "notifyThresholdCrossing",
          "eventTime": "2022-06-14T01:58:37.000+09:00",
          "systemDN": "DC=dummy.ntt.com",
          "observedPerfMetricName": "bandwidthUtilizationDown",
          "observedPerfMetricValue": 90,
          "observedPerfMetricDirection": "UP",
          "thresholdValue": 80,
          "hysteresis": 0,
          "monitorGranularityPeriod": 60
      }
      await context.pubsub.publish(topic2, notifyThresholdCrossing)


Moduleプラグインの最後の関数が、FMデータを送信する関数です。引数にはtopic名を2つ与えられます。Qmonus SDKポータルで一覧表示されるNSSIクローズドループの詳細情報からtargetEntityListに含まれるNetworkSliceSubnetのエンティティIDをメモしてください。今回3つの管理ドメインを設けていますが2つの管理ドメインのエンティティIDをメモしてこのtest関数に与えます。 test関数は先頭に与えられたNSSIクローズドループに対して上り帯域の超過アラームを送信します。2つ目に与えられたNSSIクローズドループに対しては下り帯域の超過アラームを送信します。これらは異なるクローズドループでそれぞれ処理されますが、coordination関数でマージされ、双方向の帯域超過として1つのアクションに束ねられNSIクローズドループに通知されます。

await context.pubsub.test(
    "E2EMD-a680e3aeec3911ec85f3c22fcb4cea8d",
    "E2EMD-a680df3aec3911ec85f3c22fcb4cea8d"
)


以下のログがNSIのクローズドループのPodに出力されれば動作確認は完了です。

[I 220615 01:28:01 xaas:5] Send slice change request to NSMF...
[{'dLThptPerSlice': {'guaThpt': 200000, 'maxThpt': 450000}, 'uLThptPerSlice': {'guaThpt': 20000, 'maxThpt': 90000}}]


CLA開発の流れ

CLA開発は、難易度が高めです。チュートリアルをカスタマイズするなど修練が必要です。トレーニングやノウハウ共有などのサポートはプロフェッショナルサポートにて対応しておりますので窓口までご連絡ください。最後にCLA開発の流れを以下の図に記載します。



Tip

ClosedLoopComponentで実行されるFunctionプラグインの処理内容を修正しても稼働中のクローズドループ動作には影響しません。動作を更新したい場合は、クローズドループのdeactivation/activation操作が必要です。