blob: b10269257f2b08eea445daa8ff299c56ef3a06ff [file] [log] [blame]
#
# Copyright (c) 2023 Project CHIP Authors
# All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import logging
import os
import random
from chip import ChipDeviceCtrl
from chip import clusters as Clusters
from chip import commissioning
from chip.commissioning import commissioning_flow_blocks, pase
class ExampleCustomMatterCommissioningFlow(commissioning_flow_blocks.CommissioningFlowBlocks):
def __init__(self, devCtrl: ChipDeviceCtrl.ChipDeviceControllerBase, credential_provider: commissioning.CredentialProvider, logger: logging.Logger):
super().__init__(devCtrl=devCtrl, credential_provider=credential_provider, logger=logger)
self._logger = logger
async def commission(self, parameter: commissioning.Parameters):
# The example uses PASE, however, the blocks uses a node_id, which supports both PASE and CASE.
with pase.establish_session(devCtrl=self._devCtrl, parameter=parameter.pase_param) as device:
node_id = device.node_id
self._logger.info("Sending ArmFailSafe to device")
await self.arm_failsafe(node_id=node_id, duration_seconds=parameter.failsafe_expiry_length_seconds)
self._logger.info("Setting Regulatory Configuration")
await self.send_regulatory_config(parameter=parameter, node_id=node_id)
self._logger.info("OperationalCredentials Commissioning")
case_nodeid = await self.operational_credentials_commissioning(parameter=parameter, node_id=node_id)
if not parameter.commissionee_info.is_ethernet_device:
self._logger.info("Network Commissioning")
await self.network_commissioning(parameter=parameter, node_id=node_id)
else:
self._logger.info("Device is an ethernet device, network commissioning not required.")
self._logger.info("Completing Commissioning")
await self.complete_commission(case_nodeid)
self._logger.info("Commissioning Completed")
class ExampleCredentialProvider:
def __init__(self, devCtrl: ChipDeviceCtrl.ChipDeviceController):
self._devCtrl = devCtrl
async def get_attestation_nonce(self) -> bytes:
return os.urandom(32)
async def get_csr_nonce(self) -> bytes:
return os.urandom(32)
async def get_commissionee_credentials(self, request: commissioning.GetCommissioneeCredentialsRequest) -> commissioning.GetCommissioneeCredentialsResponse:
node_id = random.randint(100000, 999999)
nocChain = self._devCtrl.IssueNOCChain(Clusters.OperationalCredentials.Commands.CSRResponse(
NOCSRElements=request.csr_elements, attestationSignature=request.attestation_signature), nodeId=node_id)
return commissioning.GetCommissioneeCredentialsResponse(
rcac=nocChain.rcacBytes,
noc=nocChain.nocBytes,
icac=nocChain.icacBytes,
ipk=nocChain.ipkBytes,
case_admin_node=self._devCtrl.nodeId,
admin_vendor_id=self._devCtrl.fabricAdmin.vendorId,
node_id=node_id,
fabric_id=self._devCtrl.fabricId)