blob: fff81bd8775a2dfc729f16a26b9f9ad667a971f0 [file] [log] [blame]
#
# Copyright (c) 2021 Project CHIP Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from dataclasses import dataclass
from enum import Enum
from queue import Queue
from typing import Optional
from chip.internal import GetCommissioner
from chip.internal.commissioner import PairingState
TEST_NODE_ID = 11223344
class PairNotificationType(Enum):
NETWORK_CREDENTIALS = 0
OPERATIONAL_CREDENTIALS = 1
COMPLETE = 2
@dataclass
class _PairNotification:
type: PairNotificationType
csr: Optional[bytes]
error_code: Optional[int]
class Connection:
def __init__(self, controller):
self.needsNetworkCredentials = False
self.needsOperationalCredentials = False
self.paired = False
self._controller = controller
self._pair_queue = Queue()
def ConnectToWiFi(self, ssid: str, password: str):
if not self.needsNetworkCredentials:
raise Exception("Not requiring network credentials yet.")
self._controller.PairSendWiFiCredentials(ssid, password)
self._WaitForPairProgress()
def ConnectToThread(self, blob: bytes):
if not self.needsNetworkCredentials:
raise Exception("Not requiring network credentials yet.")
self._controller.PairSendThreadCredentials(blob)
self._WaitForPairProgress()
def _Pair(self, discriminator: int, pin: int, deprecated_nodeid: int):
"""Sets up controller callbakcs and initiates BLE pairing."""
self._controller.on_network_credentials_requested = self._OnNetworkCredentialsRequested
self._controller.on_operational_credentials_requested = self._OnOperationalCredentialsRequested
self._controller.on_pairing_complete = self._OnPairingComplete
self._controller.BlePair(deprecated_nodeid, pin, discriminator)
def _OnNetworkCredentialsRequested(self):
self._pair_queue.put(_PairNotification(
PairNotificationType.NETWORK_CREDENTIALS, None, None))
def _OnOperationalCredentialsRequested(self, csr):
self._pair_queue.put(_PairNotification(
PairNotificationType.OPERATIONAL_CREDENTIALS, csr, None))
def _OnPairingComplete(self, err):
self._pair_queue.put(_PairNotification(
PairNotificationType.COMPLETE, None, err))
def _WaitForPairProgress(self):
"""Waits for some pairing callback progress.
Throws on pairing failure. Handles the progress on success.
"""
if self.paired:
# Already paired, no moore progress possible
return
step = self._pair_queue.get()
self.needsNetworkCredentials = step.type == PairNotificationType.NETWORK_CREDENTIALS
self.needsOperationalCredentials = step.type == PairNotificationType.OPERATIONAL_CREDENTIALS
self.paired = step.type == PairNotificationType.COMPLETE
if step.type == PairNotificationType.COMPLETE:
if step.error_code != 0:
raise Exception('Pairing ended with error code %d' %
step.error_code)
def _StartAsyncConnection(discriminator: int, pin: int, deprecated_nodeid: Optional[int] = None) -> Connection:
"""Initiates the connection to a BLE device for the purpose of commissioning
Args:
discriminator: the advertisement discriminator to use to connect to
the device
pin: the pin number used to pair with the device (must match the
device secret pin, typically available in a QR code)
deprecated_nodeid: TEMPORARY, TO BE REMOVED nodeid that will be set during
the commissioning. This will be replaced by setting the node id
at the time of setting operational credentials once such
credentials are supported.
"""
if not deprecated_nodeid:
deprecated_nodeid = TEST_NODE_ID
controller = GetCommissioner()
if controller.pairing_state != PairingState.INITIALIZED:
raise Exception("Controller is not ready to start a new pairing")
connection = Connection(controller)
connection._Pair(discriminator, pin, deprecated_nodeid)
return connection
def Connect(discriminator: int, pin: int, deprecated_nodeid: Optional[int] = None) -> Connection:
"""Connects to a BLE device for the purpose of commissioning.
Will block until the remote device can be operated on (e.g. setting network
credentials or operational credentials.)
Args:
discriminator: the advertisement discriminator to use to connect to
the device
pin: the pin number used to pair with the device (must match the
device secret pin, typically available in a QR code)
deprecated_nodeid: TEMPORARY, TO BE REMOVED nodeid that will be set during
the commissioning. This will be replaced by setting the node id
at the time of setting operational credentials once such
credentials are supported.
"""
connection = _StartAsyncConnection(discriminator, pin, deprecated_nodeid)
connection._WaitForPairProgress()
return connection
__all__ = [
'Connect',
]