ネットワーク機器のコンフィグオーダをキューイングして複数のデバイスを並行処理しつつ、デバイス毎にシーケンシャルにコンフィグを投入するアプリケーションを作成するにはどのように設計すれば良いでしょうか?



設計方式としては幾つか考えられますが、例としてQmonus SDKの組込みマルチキューであるmFIFOオブジェクトを中心としたアプリケーション設計を紹介します。


単純なFIFOでは、Reflectorのマルチワーカー処理で同一デバイスに対して競合制御を避ける実装が困難です。mFIFOでは、複数のチャネルキューを利用でき、データの取り出しは、コンテキストマネージャによってチャネルロックや例外発生時のデータ再挿入が自動で行われるため、マルチワーカーで複数のデバイスに並行処理しつつも、同一デバイスに対してはシーケンシャルに制御することが可能です。


以下の図は、mFIFOの動作イメージです。各ワーカーは、ロックが獲得できるチャネルキューからデータを取り出して並行動作します。Worker#2は、Device#1チャネルを処理しているWorker#1のロックで排他され、代わりにDevice#2を読み出しています。


mFIFOを中心としたアプリケーション設計として以下のようなシーケンス設計が考えられます。



サンプルアプリケーション

上記のシーケンスのように動作するアプリケーションを作成します。プラグインとしては、コンフィグを生成するためのTemplate、オーダ情報を管理するデータモデルとしてATOM、APIを提供するScenario、デバイスへのコンフィグを行うWorkerを作成します。

API設計

はじめにサンプルアプリケーションが提供するAPIを規定します。以下は、OAS3で記述していますのでhttps://editor.swagger.io/等で確認してください。

openapi: 3.0.1
info:
  title: Device Config Order
  description: 'Qmonus SDK mFIFO built-in usage example'
  version: 1.0.0
servers:
- url: http://localhost:9099/v1
tags:
- name: configurationOrders
paths:
  /configurationOrders:
    post:
      tags:
      - configurationOrders
      summary: 新規のコンフィグオーダを登録するAPI
      requestBody:
        content:
          application/json:
            schema:
              $ref: '#/components/schemas/ConfigOrder'
        required: true
      responses:
        202:
          description: オーダ受付完了応答
          content:
            application/json:
              schema:
                $ref: '#/components/schemas/PostResponse'
        400:
          description: オーダ情報不正応答
          content: 
            application/json:
              schema:
                $ref: '#/components/schemas/ErrorResponse'
  /configurationOrders/{orderId}:
    get:
      tags:
      - configurationOrders
      summary: コンフィグオーダの状態を取得するAPI
      parameters:
      - name: orderId
        in: path
        description: 取得するコンフィグオーダのID
        required: true
        schema:
          type: string
          format: uuid
      responses:
        200:
          description: オーダ状態取得完了応答
          content:
            application/json:
              schema:
                $ref: '#/components/schemas/GetResponse'
        404:
          description: オーダ情報取得失敗応答
          content: 
            application/json:
              schema:
                $ref: '#/components/schemas/ErrorResponse'
components:
  schemas:
    ErrorResponse:
      type: object
      required:
      - errorCode
      - errorMessage
      - modeInfo
      properties:
        errorCode:
          type: integer
          example: 400
        errorMessage:
          type: string
          example: Invalid request
        moreInfo:
          type: object
    PostResponse:
      type: object
      required:
      - orderId
      properties:
        orderId:
          type: string
          format: uuid
    GetResponse:
      type: object
      required:
      - orderId
      - status
      - template
      - parameters
      - deviceType
      properties:
        orderId:
          type: string
          format: uuid
        template:
          type: string
          description: テンプレートサービスに登録されている任意のテンプレート名
          example: createSubInterface
        parameters:
          type: object
          description: テンプレートにレンダリングするパラメータ辞書
        deviceType:
          type: string
          description: ncclientで規定されているデバイス種別
          default: default
          enum:
          - default
          - junos
          - csr
          - nexus
          - iosxr
          - iosxe
          - huawei
          - huaweiyang
          - alu
          - h3c
          - hpcomware
    ConfigOrder:
      required:
      - orderId
      - host
      - port
      - username
      - password
      - template
      - deviceType
      - parameters
      type: object
      properties:
        orderId:
          type: string
          format: uuid
        host:
          type: string
          example: 192.168.2.200
        port:
          type: integer
          example: 830
          default: 830
        username:
          type: string
          example: qmonus
        password:
          type: string
          example: qmonus
        template:
          type: string
          description: テンプレートサービスに登録されている任意のテンプレート名
          example: createSubInterface
        parameters:
          type: object
          description: テンプレートにレンダリングするパラメータ辞書
        deviceType:
          type: string
          description: Device type specified by ncclient
          default: default
          enum:
          - default
          - junos
          - csr
          - nexus
          - iosxr
          - iosxe
          - huawei
          - huaweiyang
          - alu
          - h3c
          - hpcomware


Note

認証情報は、省略しています。


コンフィグ生成のためのTemplateを作成する

機器に設定するコンフィグレーションを生成するためのテンプレートを定義します。コンフィグの内容は、何でも良いので自由に記述してください。ここではCSR1000vにサブインタフェースを作成するテンプレートとします。
本サンプルアプリケーションでは、APIで受信したパラメータでテンプレートをレンダリングしますが、様々なテンプレートを追加して1つのAPIでいろんなコンフィグができるように実装するため、パラメータのバリデーションをScenarioのrequest_validationでは行わず、テンプレート側でバリデーションします。そのため、schemaオプションに本テンプレートの入力パラメータスキーマを定義しています。また、APIクライアントからは実行したいテンプレートを指定するだけでロールバックに対する指示は不要にしたいのでmetadataオプションでロールバック時に使用するテンプレートを記述しています。

Template機能の詳細は、Docs » Scenario » テンプレートを参照してください。


  • サブインタフェース作成用テンプレート
- tag: createSubInterface
  template: |-
    <config>
      <cli-config-data>
        <cmd>interface gigabitEthernet 3.{{ vlan }}</cmd>
        <cmd>encapsulation dot1Q {{ vlan }}</cmd>
        <cmd>ip address 172.16.{{ vlan }}.1 255.255.255.0</cmd>
        <cmd>no shutdown</cmd>
      </cli-config-data>
    </config>
  metadata:
    rollback: deleteSubInterface
  schema:
    type: object
    properties:
      vlan:
        maximum: 4094
        mininum: 1
        type: integer
    required:
      - vlan
  expire_seconds: 3600


  • サブインタフェース削除用テンプレート
- tag: deleteSubInterface
  template: |-
    <config>
      <cli-config-data>
        <cmd>interface gigabitEthernet 3.{{ vlan }}</cmd>
        <cmd>no ip address 172.16.{{ vlan }}.1 255.255.255.0</cmd>
        <cmd>no encapsulation dot1Q</cmd>
        <cmd>exit</cmd>
        <cmd>no interface gigabitEthernet 3.{{ vlan }}</cmd>
      </cli-config-data>
    </config>
  metadata:
    rollback: createSubInterface
  schema:
    type: object
    properties:
      vlan:
        maximum: 4094
        mininum: 1
        type: integer
    required:
      - vlan
  expire_seconds: 3600


テンプレートが作成できたら、Frontalでレンダリングをテストしてみてください。


コンフィグオーダを管理するATOMを作成する

コンフィグオーダを管理するデータモデルとしてConfigOrderクラスを定義します。APIクライアントは、コンフィグオーダをポーリングして機器制御の完了を待ち受けます。そのため、コンフィグオーダの状態を管理する必要があります。
本サンプルでは、コンフィグ状態を以下の状態遷移で実装します。


以下は、ConfigOrderクラスの定義です。状態遷移は、後ほど作成するWorkerで行うので本クラスは単純なデータモデルとして利用します。
ttlフィールドは、Failed状態となった後、ロールバックを試行しますが、オーダ毎にロールバックリトライの試行回数を指定できるように定義しています。 Workerでロールバックリトライ時にttlフィールドの値をデクリメントして0まで試行したらRollbackFailedに遷移させるように実装します。

category: example
name: ConfigOrder
persistence: true
abstract: false
api_generation: false
attributes:
  identifier:
    field_immutable: true
    field_name: orderId
    field_persistence: true
    field_type: string
  local_fields:
    - field_immutable: false
      field_name: host
      field_nullable: false
      field_persistence: true
      field_type: string
      field_unique: false
    - field_immutable: false
      field_name: port
      field_nullable: false
      field_persistence: true
      field_type: integer
      field_unique: false
    - field_immutable: false
      field_name: username
      field_nullable: false
      field_persistence: true
      field_type: string
      field_unique: false
    - field_immutable: false
      field_name: password
      field_nullable: false
      field_persistence: true
      field_type: string
      field_unique: false
    - field_immutable: false
      field_name: template
      field_nullable: false
      field_persistence: true
      field_type: string
      field_unique: false
    - field_immutable: false
      field_name: rollbackTemplate
      field_nullable: true
      field_persistence: true
      field_type: string
      field_unique: false
    - field_default: Processing
      field_enum:
        - Processing
        - Completed
        - Failed
        - RollbackCompleted
        - RollbackFailed
      field_immutable: false
      field_name: status
      field_nullable: false
      field_persistence: true
      field_type: string
      field_unique: false
    - field_default: dict()
      field_immutable: false
      field_name: parameters
      field_nullable: false
      field_persistence: true
      field_type: object
      field_unique: false
    - field_default: default
      field_enum:
        - default
        - junos
        - csr
        - nexus
        - iosxr
        - iosxe
        - huawei
        - huaweiyang
        - alu
        - h3c
        - hpcomware
      field_immutable: false
      field_name: deviceType
      field_nullable: false
      field_persistence: true
      field_type: string
      field_unique: false
    - field_default: '2'
      field_immutable: false
      field_name: ttl
      field_nullable: false
      field_persistence: true
      field_type: integer
      field_unique: false
  ref_fields: []
methods:
  class_methods: []
  instance_methods: []


コンフィグオーダをAPIで受付するScenarioを作成する

様々なコンフィグオーダをAPIで受付し、ConfigOrderを生成して、mFIFOにオーダをキューイング後、応答を返却するシナリオを作成します。
同期型のAPIとなるため、トランザクション機能は使用しません。 request_validationコマンドを配置してリクエストヘッダとリクエストbodyをバリデーションします。どんなコンフィグをオーダするのかは、APIクライアントがtemplate名を指定することで決定します。テンプレートへのレンダリングパラメータは、テンプレート毎に異なるため、request_validationのバリデーションスキーマではバリデーションせず、事後プロセスのpythonコードでバリデーションしています。最初にテンプレート作成時にテンプレート毎にschemaを定義しているのでそれを利用してバリデーションします。template.validateの呼び出しによってAPIのparametersで指定されたレンダリングパラメータが適正かどうかをチェックしています。バリデーションエラーの場合は、400 BadRequestが返却されます。
次にテンプレートのmetadataにロールバック用のテンプレートが設定されていればコンフィグオーダのロールバックに利用するテンプレート名をrollbackTemplateに設定してオーダを保存します。最後にオーダ情報をイベントキューに挿入してAPIクライアントに応答を返却します。

- category: example
  name: registConfigOrder
  uri: /v1/configurationOrders
  method: POST
  routing_auto_generation_mode: true
  connect_timeout: 60
  request_timeout: 60
  commands:
    - command: request_validation
      kwargs:
        aspect_options:
          post:
            process: |-
              # インスタンス生成
              order = atom.ConfigOrder(**context.request.body.dictionary)

              # テンプレート存在チェック
              if not await Template.exists(order.template):
                  raise Error(400, reason="Configuration template not found %r" % order.template)

              # テンプレート読み込み
              template = await Template.load(order.template)

              # パラメータ妥当性検査
              await template.validate(**order.parameters)

              # rollbackテンプレート選定
              if template.metadata and template.metadata.get("rollback", None):
                  order.rollbackTemplate = template.metadata["rollback"]

              # インスタンス保存
              await order.save()

              # イベントキューイング
              await mFIFO("configOrder").put(order.host, order.dictionary)

              # 応答返却
              context.session.set_status(202)
              context.session.finish(dict(orderId=context.request.body.orderId))
        body:
          type: object
          properties:
            deviceType:
              enum:
                - default
                - junos
                - csr
                - nexus
                - iosxr
                - iosxe
                - huawei
                - huaweiyang
                - alu
                - h3c
                - hpcomware
              type: string
            host:
              type: string
            orderId:
              type: string
            parameters:
              type: object
            password:
              type: string
            port:
              type: integer
            template:
              type: string
            username:
              type: string
          required:
            - orderId
            - host
            - port
            - username
            - password
            - template
            - parameters
            - deviceType
        headers:
          type: object
          properties:
            Content-Type:
              enum:
                - application/json
              type: string
          required:
            - Content-Type


コンフィグオーダの進捗状態をAPIでポーリングするためのScenarioを作成する

指定されたorderIdに該当するコンフィグオーダを応答します。同期型のAPIとなるため、トランザクション機能は使用しません。 ここでは単純にコンフィグオーダのインスタンスをロードしてローカルフィールドの情報を返却しています。

- category: example
  name: getConfigOrder
  uri: '/v1/configurationOrders/{orderId}'
  method: GET
  request_timeout: 60
  connect_timeout: 60
  routing_auto_generation_mode: true
  commands:
    - command: script
      kwargs:
        code: |-
          order = await atom.ConfigOrder.load(context.request.resources.orderId)
          if not order:
              raise Error(404, reason="Not found")

          """機器へのアクセス情報は不要なので除外
          """
          context.session.finish(order.localfields(ignore_fields=["host", "port", "username", "password"]))


コンフィグを実行するWorkerを作成する

コンフィグオーダをmFIFOから取り出してコンフィグを生成し、NETCONFで機器に設定して結果をコンフィグオーダ状態に反映します。

- category: example
  name: Configurator
  mode: multi
  script: |-
    async def consume():
        """ランタイムが正常動作している間はキュー刈り取りを繰り返す
        """
        while Runtime.running():
            """unlockDelayでこのコンテキストブロックを抜けた後、チャネルロックを開放するまでの遅延を1秒挿入しています(デフォルトは、3秒)
            該当機器の制御に失敗するケースではすぐにロールバックを開始しても再度失敗する可能性が高く、同一機器に対してロールバックリトライ
            を繰り返している間、他の機器への制御が遅延してしまうことを避けるため、チャネルロック開放を遅延させて他のチャネルの読み出しを
            優先させるためにunlockDelayを設定することができます
            """
            async with mFIFO("configOrder", unlockDelay=1) as order:
                """キューが空の場合は、終了する
                ワークロードを停止するが、デフォルトでは10sec後に本関数は再起動される
                """
                if order is None:
                    return
                """DBからオーダ情報をロードする
                """
                o = await atom.ConfigOrder.load(order.data.orderId)
                """テンプレートをレンダリングしてコンフィグを生成する
                """
                config = await rendering(order.data.template, o.parameters)
                try:
                    """NETCONFでコンフィグする
                    """
                    async with Netconf(host=o.host,
                                       port=o.port,
                                       username=o.username,
                                       password=o.password,
                                       device_params=dict(name=o.deviceType),
                                       hostkey_verify=False,
                                       timeout=20) as conn:
                        target = "candidate" if conn.enableCandidate else "running"
                        if conn.enableCandidate:
                            await conn.discard_changes()

                        await conn.lock(target=target)
                        await conn.edit_config(target=target, config=config)

                        if conn.enableCandidate:
                            await conn.commit(confirmed=False, timeout=1800)

                        await conn.unlock(target=target)
                    """オーダ情報を完了状態に遷移
                    """
                    await o.save(status="Completed" if o.status=="Processing" else "RollbackCompleted")
                except:
                    """エラーが発生した場合は、ロールバックテンプレートをセットし、ロールバックオーダをキュー先頭に再挿入
                    """
                    if o.rollbackTemplate and o.ttl:
                        """オーダ情報を失敗状態に遷移
                        """
                        order.data.template = o.rollbackTemplate
                        """ConfigOrderのttlを減算して無限にロールバック失敗を繰り返さないようにしています
                        """
                        o.ttl-=1
                        await o.save(status="Failed")
                        await mFIFO("configOrder").put(order.channel, order.data.dictionary, top=True)
                        break
                    else:
                        """オーダ情報をロールバック失敗状態に遷移
                        """
                        await o.save(status="RollbackFailed")


Note

本ワーカーは、キューが空になると終了しますが、起動オプション--worker_watch_intervalに指定された間隔で再開します。デフォルトは、10secです。再開周期をカスタマイズしたい場合は、該当の起動パラメータを調整してください。

Warning

本サンプルでは、NETCONFでコンフィグ後、statupコンフィグへのcopyを省略しています。必要に応じてcopy_configを呼び出してください。


動作確認

REPLから作成したAPIを呼び出して動作を確認します。

>>> payload = dict(orderId=uuid.uuid1().hex, host="192.168.2.200", port=22, username="qmonus", password="qmonus", deviceType="iosxe", template="createSubInterface", parameters=dict(vlan=50))↵
... r = await callout(path="/v1/configurationOrders", method=POST, body=payload)↵
... print(r.body)↵
... ↵
↵
b'{"orderId":"0d56e1767bef11eb87d7acde48001122"}'
>>> r = await callout(path="/v1/configurationOrders/0d56e1767bef11eb87d7acde48001122")↵
... print(r.body)↵
... ↵
↵
b'{"orderId":"0d56e1767bef11eb87d7acde48001122","template":"createSubInterface","rollbackTemplate":"deleteSubInterface","status":"Completed","parameters":{"vlan":50},"deviceType":"iosxe","ttl":5}'
>>>


上記では、APIの呼び出しが行えることの確認と応答の確認しかできていません。実際の機器にコンフィグが反映されているか確認するためにテストケースを作成します。
テストケースは、以下のように記述できます。本テストケースは、APIでコンフィグオーダを送信して応答メッセージをチェックしたら、assert_endブロックでオーダの状態確定をポーリングし、Completed状態を検出します。その後、Netconfで対象機器からrunningコンフィグを取得し、XPATHを利用して設定が適正に行われているかをチェックしています。

category: example
name: ConfigurationTest
target: Configuration
type: testcase
fakers: {}
input:
  path: /v1/configurationOrders
  method: POST
  headers:
    Content-Type: application/json
  body: |-
    def body():
        return dict(orderId=uuid.uuid1().hex,
                    host="192.168.2.200",
                    port=22,
                    username="qmonus",
                    password="qmonus",
                    deviceType="iosxe",
                    template="createSubInterface",
                    parameters=dict(vlan=50))
assertion:
  output: |-
    async def assert_output(*args, **kwargs):
        # レスポンスコードが202 Acceptedであること
        assert Response.code==202, "Ivalud response code %r" % Response.code

        # レスポンスbodyがセットされていること
        assert Response.body, "Empty body %r" % Response.body

        # レスポンスbodyにorderIdがセットされていること
        assert "orderId" in json.loads(Response.body), "Invalid response body %s" % Response.body
  progress: []
  end: |-
    async def assert_end(*args, **kwargs):
        from lxml import etree

        (orderId, status) = (MU(json.loads(Response.body)).orderId, None)
        # オーダの状態確定をポーリングして待ち合わせます
        for i in range(30):
            if i > 0:
                await asyncio.sleep(3)
            r = await callout(path="/v1/configurationOrders/{}".format(orderId))
            # レスポンスコードが200 Successであること
            assert r.code==200, "Invalid response code %r" % r.code

            # レスポンスbodyがセットされていること
            assert r.body, "Empty body %r" % r.body

            # レスポンスbodyのstatusが適正な値にセットされていること
            status = MU(json.loads(r.body)).status
            assert status in ["Processing",
                              "Completed",
                              "Failed",
                              "RollbackCompleted",
                              "RollbackFailed"], "Invalid status %r" % status
            # statusが確定したらポーリングを抜ける
            if status in ["Completed", "RollbackCompleted", "RollbackFailed"]:
                break

        # statusがCompletedであること
        assert status=="Completed", "Unexpected status %r" % status
        o = await atom.ConfigOrder.load(orderId)

        """機器のrunningコンフィグをXMLで取得し、XPATHで設定が適切に行われていることをチェック
        """
        async with Netconf(host=o.host,
                           port=o.port,
                           username=o.username,
                           password=o.password,
                           device_params=dict(name=o.deviceType),
                           hostkey_verify=False,
                           timeout=20) as conn:
            xml = await conn.get_config(source="running", filter=("subtree", "<config-format-xml/>"))
            tree = etree.fromstring(xml.encode("utf-8"))
            ifIndex = None
            for index, element in enumerate(tree.iterfind(".//{*}interface/{*}Param")):
                if element.text == "GigabitEthernet3.%d" % o.parameters["vlan"]:
                    ifIndex = index+1
                    qprint("Created Subinterface %r(%d)" % (element.text, ifIndex))
                    break

            # サブインタフェースが存在していること
            assert ifIndex, "Subinterface not found"
            base = ".//{*}interface[%d]/{*}ConfigSubif-Configuration" % ifIndex
            vlan = tree.find("/".join([base, "{*}encapsulation/{*}dot1Q/{*}IEEEVLANIDRequired"]))
            address = tree.find("/".join([base, "{*}ip/{*}address/{*}IPAddress"]))
            netmask = tree.find("/".join([base, "{*}ip/{*}address/{*}IPSubnetMask"]))

            qprint("Subinterface settings VLAN: %s" % vlan.text if vlan.text else "VLAN not found")
            qprint("Subinterface settings IpAddress: %s" % address.text if address.text else "IpAddress not found")
            qprint("Subinterface settings IpSubnetMask: %s" % netmask.text if netmask.text else "IPSubnetMask not found")

            # 機器に設定が完了していること
            assert vlan.text and address.text and netmask.text, "Subinterface configuration incomplete"


テストケースは以下のようにREPLから実行できます。assert_endブロックのコードには、qprint組込関数を使用して取得したXMLからXPATHで抽出した設定値を出力するように記述しているのでdebug()でデバッグモードにしてから実行するとREPLで設定値を確認することができます。

>>> debug()↵
Disconnected the currently subscribed channel and connected to the debug channel ['xaas.southbound.channel']
debug channel connected
>>> await Test.run("ConfigurationTest")↵
... ↵
↵
[4c80c14a7fb511eb8b19acde48001122] Testcase.ConfigurationTest.preparation...[Passed]
[4c80c14a7fb511eb8b19acde48001122] Testcase.ConfigurationTest.assert_begin...[Passed]
[4c80c14a7fb511eb8b19acde48001122] Testcase.ConfigurationTest.call...[Passed]
[4c80c14a7fb511eb8b19acde48001122] Testcase.ConfigurationTest.assert_output...[Passed]
Created Subinterface 'GigabitEthernet3.50'(7)
Subinterface settings VLAN: 50
Subinterface settings IpAddress: 172.16.50.1
Subinterface settings IpSubnetMask: 255.255.255.0
[4c80c14a7fb511eb8b19acde48001122] Testcase.ConfigurationTest.assert_end...[Passed]
[4c80c14a7fb511eb8b19acde48001122] Testcase.ConfigurationTest.cleanup...[Passed]
>>> debug(False)↵
debug channel disconnected
>>>