blob: 04dd8de3494eda23e5323456ca1f7a154700a3bf [file] [log] [blame]
#
# Copyright (c) 2023 Project CHIP Authors
# All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import asyncio
import os
import traceback
from dataclasses import dataclass
from textwrap import dedent
from typing import Callable
from probe.ip_utils import get_addr_type
from utils.artifact import create_standard_log_name, log
from utils.log import add_border, border_print
from zeroconf import ServiceBrowser, ServiceInfo, ServiceListener, Zeroconf
logger = log.get_logger(__file__)
@dataclass()
class MdnsTypeInfo:
type: str
description: str
commissioner = MdnsTypeInfo(
"COMMISSIONER",
"This is a service for a Matter commissioner aka. controller"
)
commissionable = MdnsTypeInfo(
"COMMISSIONABLE / EXTENDED DISCOVERY",
"This is a service to be used in the commissioning process and provides more info about the device."
)
operational = MdnsTypeInfo(
"OPERATIONAL",
"This is a service for a commissioned Matter device. It exposes limited info about the device."
)
border_router = MdnsTypeInfo(
"THREAD BORDER ROUTER",
"This is a service for a thread border router; may be used for thread+Matter devices."
)
_MDNS_TYPES = {
"_matterd._udp.local.": commissioner,
"_matterc._udp.local.": commissionable,
"_matter._tcp.local.": operational,
"_meshcop._udp.local.": border_router,
}
@dataclass()
class RecordParser:
readable_name: str
explanation: str
parse: Callable[[str], str]
# TODO: Meshcop parser
class MatterTxtRecordParser:
def __init__(self):
self.parsers = {
"D": RecordParser("Discriminator",
dedent("\
Differentiates this instance of the device from others w/ same VID/PID that might be \n\
in the environment."),
MatterTxtRecordParser.parse_d), # To hex
"VP": RecordParser("VID/PID",
"The Vendor ID and Product ID (each are two bytes of hex) that identify this product.",
MatterTxtRecordParser.parse_vp), # Split + to hex
"CM": RecordParser("Commissioning mode",
"Whether the device is in commissioning mode or not.",
MatterTxtRecordParser.parse_cm), # Decode
"DT": RecordParser("Device type",
"Application type for this end device.",
MatterTxtRecordParser.parse_dt), # Decode
"DN": RecordParser("Device name",
"Manufacturer provided device name. MAY match NodeLabel in Basic info cluster.",
MatterTxtRecordParser.parse_pass_through), # None
"RI": RecordParser("Rotating identifier",
"Vendor specific, non-trackable per-device ID.",
MatterTxtRecordParser.parse_pass_through), # None
"PH": RecordParser("Pairing hint",
dedent("\
Given the current device state, follow these instructions to make the device \n\
commissionable."),
MatterTxtRecordParser.parse_ph), # Decode
"PI": RecordParser("Pairing instructions",
dedent("\
Used with the Pairing hint. If the Pairing hint mentions N, this is the \n\
value of N."),
MatterTxtRecordParser.parse_pass_through), # None
# General records
"SII": RecordParser("Session idle interval",
"Message Reliability Protocol retry interval while the device is idle in milliseconds.",
MatterTxtRecordParser.parse_pass_through), # None
"SAI": RecordParser("Session active interval",
dedent("\
Message Reliability Protocol retry interval while the device is active \n\
in milliseconds."),
MatterTxtRecordParser.parse_pass_through), # None
"SAT": RecordParser("Session active threshold",
"Duration of time this device stays active after last activity in milliseconds.",
MatterTxtRecordParser.parse_pass_through), # None
"T": RecordParser("Supports TCP",
"Whether this device supports TCP client and or Server.",
MatterTxtRecordParser.parse_t), # Decode
}
self.unparsed_records = ""
self.parsed_records = ""
def parse_single_record(self, key: str, value: str):
parser: RecordParser = self.parsers[key]
self.parsed_records += add_border(parser.readable_name + "\n")
self.parsed_records += parser.explanation + "\n\n"
try:
self.parsed_records += "PARSED VALUE: " + parser.parse(value) + "\n"
except Exception:
logger.error("Exception parsing TXT record, appending raw value")
logger.error(traceback.format_exc())
self.parsed_records += f"RAW VALUE: {value}\n"
def get_output(self) -> str:
unparsed_exp = "\nThe following TXT records were not parsed or explained:\n"
parsed_exp = "\nThe following was discovered about this device via TXT records:\n"
ret = ""
if self.unparsed_records:
ret += unparsed_exp + self.unparsed_records
if self.parsed_records:
ret += parsed_exp + self.parsed_records
return ret
def parse_records(self, info: ServiceInfo) -> str:
if info.properties is not None:
for name, value in info.properties.items():
try:
name = name.decode("utf-8")
except UnicodeDecodeError:
name = str(name)
try:
value = value.decode("utf-8")
except UnicodeDecodeError:
value = str(value)
if name not in self.parsers:
self.unparsed_records += f"KEY: {name} VALUE: {value}\n"
else:
self.parse_single_record(name, value)
return self.get_output()
@staticmethod
def parse_pass_through(txt_value: str) -> str:
return txt_value
@staticmethod
def parse_d(txt_value: str) -> str:
return hex(int(txt_value))
@staticmethod
def parse_vp(txt_value: str) -> str:
vid, pid = txt_value.split("+")
vid, pid = hex(int(vid)), hex(int(pid))
return f"VID: {vid}, PID: {pid}"
@staticmethod
def parse_cm(txt_value: str) -> str:
cm = int(txt_value)
mode_descriptions = [
"Not in commissioning mode",
"In passcode commissioning mode (standard mode)",
"In dynamic passcode commissioning mode",
]
return mode_descriptions[cm]
@staticmethod
def parse_dt(txt_value: str) -> str:
application_device_types = {
# lighting
"0x100": "On/Off Light",
"0x101": "Dimmable Light",
"0x10C": "Color Temperature Light",
"0x10D": "Extended Color Light",
# smart plugs/outlets and other actuators
"0x10A": "On/Off Plug-in Unit",
"0x10B": "Dimmable Plug-In Unit",
"0x303": "Pump",
# switches and controls
"0x103": "On/Off Light Switch",
"0x104": "Dimmer Switch",
"0x105": "Color Dimmer Switch",
"0x840": "Control Bridge",
"0x304": "Pump Controller",
"0xF": "Generic Switch",
# sensors
"0x15": "Contact Sensor",
"0x106": "Light Sensor",
"0x107": "Occupancy Sensor",
"0x302": "Temperature Sensor",
"0x305": "Pressure Sensor",
"0x306": "Flow Sensor",
"0x307": "Humidity Sensor",
"0x850": "On/Off Sensor",
# closures
"0xA": "Door Lock",
"0xB": "Door Lock Controller",
"0x202": "Window Covering",
"0x203": "Window Covering Controller",
# HVAC
"0x300": "Heating/Cooling Unit",
"0x301": "Thermostat",
"0x2B": "Fan",
# media
"0x28": "Basic Video Player",
"0x23": "Casting Video Player",
"0x22": "Speaker",
"0x24": "Content App",
"0x29": "Casting Video Client",
"0x2A": "Video Remote Control",
# generic
"0x27": "Mode Select",
}
return application_device_types[hex((int(txt_value))).upper().replace("0X", "0x")]
@staticmethod
def parse_ph(txt_value: str) -> str:
pairing_hints = [
"Power Cycle",
"Custom commissioning flow",
"Use existing administrator (already commissioned)",
"Use settings menu on device",
"Use the PI TXT record hint",
"Read the manual",
"Press the reset button",
"Press Reset Button with application of power",
"Press Reset Button for N seconds",
"Press Reset Button until light blinks",
"Press Reset Button for N seconds with application of power",
"Press Reset Button until light blinks with application of power",
"Press Reset Button N times",
"Press Setup Button",
"Press Setup Button with application of power",
"Press Setup Button for N seconds",
"Press Setup Button until light blinks",
"Press Setup Button for N seconds with application of power",
"Press Setup Button until light blinks with application of power",
"Press Setup Button N times",
]
ret = "\n"
b_arr = [int(b) for b in bin(int(txt_value))[2:]][::-1]
for i in range(0, len(b_arr)):
b = b_arr[i]
if b:
ret += pairing_hints[i] + "\n"
return ret
@staticmethod
def parse_t(txt_value: str) -> str:
return "TCP supported" if int(txt_value) else "TCP not supported"
class MatterDnssdListener(ServiceListener):
def __init__(self, artifact_dir: str) -> None:
super().__init__()
self.artifact_dir = artifact_dir
self.logger = logger
self.discovered_matter_devices: [str, ServiceInfo] = {}
def write_log(self, line: str, log_name: str) -> None:
with open(self.create_device_log_name(log_name), "a+") as log_file:
log_file.write(line)
def create_device_log_name(self, device_name) -> str:
return os.path.join(
self.artifact_dir,
create_standard_log_name(f"{device_name}_dnssd", "txt"))
@staticmethod
def log_addr(info: ServiceInfo) -> str:
ret = add_border("This device has the following IP addresses\n")
for addr in info.parsed_scoped_addresses():
ret += f"{get_addr_type(addr)}: {addr}\n"
return ret
def handle_service_info(
self,
zc: Zeroconf,
type_: str,
name: str,
delta_type: str) -> None:
info = zc.get_service_info(type_, name)
self.discovered_matter_devices[name] = info
to_log = f"{name}\n"
update_str = f"\nSERVICE {delta_type}\n"
to_log += ("*" * (len(update_str) - 2)) + update_str
to_log += _MDNS_TYPES[type_].type + "\n"
to_log += _MDNS_TYPES[type_].description + "\n"
to_log += f"A/SRV TTL: {str(info.host_ttl)}\n"
to_log += f"PTR/TXT TTL: {str(info.other_ttl)}\n"
txt_parser = MatterTxtRecordParser()
to_log += txt_parser.parse_records(info)
to_log += self.log_addr(info)
self.logger.info(to_log)
self.write_log(to_log, name)
def add_service(self, zc: Zeroconf, type_: str, name: str) -> None:
self.handle_service_info(zc, type_, name, "ADDED")
def update_service(self, zc: Zeroconf, type_: str, name: str) -> None:
self.handle_service_info(zc, type_, name, "UPDATED")
def remove_service(self, zc: Zeroconf, type_: str, name: str) -> None:
to_log = f"Service {name} removed\n"
to_log += _MDNS_TYPES[type_].type + "\n"
to_log += _MDNS_TYPES[type_].description
if name in self.discovered_matter_devices:
del self.discovered_matter_devices[name]
self.logger.warning(to_log)
self.write_log(to_log, name)
def browse_interactive(self) -> None:
zc = Zeroconf()
ServiceBrowser(zc, list(_MDNS_TYPES.keys()), self)
try:
self.logger.warning(
dedent("\
\n\
Browsing Matter DNS-SD\n\
DCL Lookup: https://webui.dcl.csa-iot.org/\n\
See spec section 4.3 for details of Matter TXT records.\n"))
border_print("Press enter to stop!", important=True)
input("")
finally:
zc.close()
async def browse_once(self, browse_time_seconds: int) -> Zeroconf:
zc = Zeroconf()
ServiceBrowser(zc, list(_MDNS_TYPES.keys()), self)
await asyncio.sleep(browse_time_seconds)
zc.close()
return zc