| """ |
| REST client for Leshan demo server |
| ################################## |
| |
| Copyright (c) 2023 Nordic Semiconductor ASA |
| |
| SPDX-License-Identifier: Apache-2.0 |
| |
| """ |
| |
| from __future__ import annotations |
| |
| import json |
| import binascii |
| import time |
| from datetime import datetime |
| from contextlib import contextmanager |
| import requests |
| |
| class Leshan: |
| """This class represents a Leshan client that interacts with demo server's REAT API""" |
| def __init__(self, url: str): |
| """Initialize Leshan client and check if server is available""" |
| self.api_url = url |
| self.timeout = 10 |
| #self.format = 'TLV' |
| self.format = "SENML_CBOR" |
| self._s = requests.Session() |
| try: |
| resp = self.get('/security/clients') |
| if not isinstance(resp, list): |
| raise RuntimeError('Did not receive list of endpoints') |
| except requests.exceptions.ConnectionError as exc: |
| raise RuntimeError('Leshan not responding') from exc |
| |
| @staticmethod |
| def handle_response(resp: requests.models.Response): |
| """ |
| Handle the response received from the server. |
| |
| Parameters: |
| - response: The response object received from the server. |
| |
| Returns: |
| - dict: The parsed JSON response as a dictionary. |
| |
| Raises: |
| - Exception: If the response indicates an error condition. |
| """ |
| if resp.status_code >= 300 or resp.status_code < 200: |
| raise RuntimeError(f'Error {resp.status_code}: {resp.text}') |
| if len(resp.text): |
| obj = json.loads(resp.text) |
| return obj |
| return None |
| |
| def get(self, path: str): |
| """Send HTTP GET query with typical parameters""" |
| params = {'timeout': self.timeout} |
| if self.format is not None: |
| params['format'] = self.format |
| resp = self._s.get(f'{self.api_url}{path}', params=params, timeout=self.timeout) |
| return Leshan.handle_response(resp) |
| |
| def put_raw(self, path: str, data: str | dict | None = None, headers: dict | None = None, params: dict | None = None): |
| """Send HTTP PUT query without any default parameters""" |
| resp = self._s.put(f'{self.api_url}{path}', data=data, headers=headers, params=params, timeout=self.timeout) |
| return Leshan.handle_response(resp) |
| |
| def put(self, path: str, data: str | dict, uri_options: str = ''): |
| """Send HTTP PUT query with typical parameters""" |
| if isinstance(data, dict): |
| data = json.dumps(data) |
| return self.put_raw(f'{path}?timeout={self.timeout}&format={self.format}' + uri_options, data=data, headers={'content-type': 'application/json'}) |
| |
| def post(self, path: str, data: str | dict | None = None): |
| """Send HTTP POST query""" |
| if isinstance(data, dict): |
| data = json.dumps(data) |
| if data is not None: |
| headers={'content-type': 'application/json'} |
| uri_options = f'?timeout={self.timeout}&format={self.format}' |
| else: |
| headers=None |
| uri_options = '' |
| resp = self._s.post(f'{self.api_url}{path}' + uri_options, data=data, headers=headers, timeout=self.timeout) |
| return Leshan.handle_response(resp) |
| |
| def delete_raw(self, path: str): |
| """Send HTTP DELETE query""" |
| resp = self._s.delete(f'{self.api_url}{path}', timeout=self.timeout) |
| return Leshan.handle_response(resp) |
| |
| def delete(self, endpoint: str, path: str): |
| """Send LwM2M DELETE command""" |
| return self.delete_raw(f'/clients/{endpoint}/{path}') |
| |
| def execute(self, endpoint: str, path: str): |
| """Send LwM2M EXECUTE command""" |
| return self.post(f'/clients/{endpoint}/{path}') |
| |
| def write(self, endpoint: str, path: str, value: bool | int | str): |
| """Send LwM2M WRITE command to a single resource or resource instance""" |
| if len(path.split('/')) == 3: |
| kind = 'singleResource' |
| else: |
| kind = 'resourceInstance' |
| rid = path.split('/')[-1] |
| return self.put(f'/clients/{endpoint}/{path}', self._define_resource(rid, value, kind)) |
| |
| def write_attributes(self, endpoint: str, path: str, attributes: dict): |
| """Send LwM2M Write-Attributes to given path |
| example: |
| leshan.write_attributes(endpoint, '1/2/3, {'pmin': 10, 'pmax': 40}) |
| """ |
| return self.put_raw(f'/clients/{endpoint}/{path}/attributes', params=attributes) |
| |
| def remove_attributes(self, endpoint: str, path: str, attributes: list): |
| """Send LwM2M Write-Attributes to given path |
| example: |
| leshan.remove_attributes(endpoint, '1/2/3, ['pmin', 'pmax']) |
| """ |
| attrs = '&'.join(attributes) |
| return self.put_raw(f'/clients/{endpoint}/{path}/attributes?'+ attrs) |
| |
| def update_obj_instance(self, endpoint: str, path: str, resources: dict): |
| """Update object instance""" |
| data = self._define_obj_inst(path, resources) |
| return self.put(f'/clients/{endpoint}/{path}', data, uri_options='&replace=false') |
| |
| def replace_obj_instance(self, endpoint: str, path: str, resources: dict): |
| """Replace object instance""" |
| data = self._define_obj_inst(path, resources) |
| return self.put(f'/clients/{endpoint}/{path}', data, uri_options='&replace=true') |
| |
| def create_obj_instance(self, endpoint: str, path: str, resources: dict): |
| """Send LwM2M CREATE command""" |
| data = self._define_obj_inst(path, resources) |
| path = '/'.join(path.split('/')[:-1]) # Create call should not have instance ID in path |
| return self.post(f'/clients/{endpoint}/{path}', data) |
| |
| @classmethod |
| def _type_to_string(cls, value): |
| """ |
| Convert a Python value to its corresponding Leshan representation. |
| |
| Parameters: |
| - value: The value to be converted. |
| |
| Returns: |
| - str: The string representation of the value. |
| """ |
| if isinstance(value, bool): |
| return 'boolean' |
| if isinstance(value, int): |
| return 'integer' |
| if isinstance(value, datetime): |
| return 'time' |
| return 'string' |
| |
| @classmethod |
| def _convert_type(cls, value): |
| """Wrapper for special types that are not understood by Json""" |
| if isinstance(value, datetime): |
| return int(value.timestamp()) |
| else: |
| return value |
| |
| @classmethod |
| def _define_obj_inst(cls, path: str, resources: dict): |
| """Define an object instance for Leshan""" |
| data = { |
| "kind": "instance", |
| "id": int(path.split('/')[-1]), # ID is last element of path |
| "resources": [] |
| } |
| for key, value in resources.items(): |
| if isinstance(value, dict): |
| kind = 'multiResource' |
| else: |
| kind = 'singleResource' |
| data['resources'].append(cls._define_resource(key, value, kind)) |
| return data |
| |
| @classmethod |
| def _define_resource(cls, rid, value, kind='singleResource'): |
| """Define a resource for Leshan""" |
| if kind in ('singleResource', 'resourceInstance'): |
| return { |
| "id": rid, |
| "kind": kind, |
| "value": cls._convert_type(value), |
| "type": cls._type_to_string(value) |
| } |
| if kind == 'multiResource': |
| return { |
| "id": rid, |
| "kind": kind, |
| "values": value, |
| "type": cls._type_to_string(list(value.values())[0]) |
| } |
| raise RuntimeError(f'Unhandled type {kind}') |
| |
| @classmethod |
| def _decode_value(cls, val_type: str, value: str): |
| """ |
| Decode the Leshan representation of a value back to a Python value. |
| """ |
| if val_type == 'BOOLEAN': |
| return bool(value) |
| if val_type == 'INTEGER': |
| return int(value) |
| return value |
| |
| @classmethod |
| def _decode_resource(cls, content: dict): |
| """ |
| Decode the Leshan representation of a resource back to a Python dictionary. |
| """ |
| if content['kind'] == 'singleResource' or content['kind'] == 'resourceInstance': |
| return {content['id']: cls._decode_value(content['type'], content['value'])} |
| elif content['kind'] == 'multiResource': |
| values = {} |
| for riid, value in content['values'].items(): |
| values.update({int(riid): cls._decode_value(content['type'], value)}) |
| return {content['id']: values} |
| raise RuntimeError(f'Unhandled type {content["kind"]}') |
| |
| @classmethod |
| def _decode_obj_inst(cls, content): |
| """ |
| Decode the Leshan representation of an object instance back to a Python dictionary. |
| """ |
| resources = {} |
| for resource in content['resources']: |
| resources.update(cls._decode_resource(resource)) |
| return {content['id']: resources} |
| |
| @classmethod |
| def _decode_obj(cls, content): |
| """ |
| Decode the Leshan representation of an object back to a Python dictionary. |
| """ |
| instances = {} |
| for instance in content['instances']: |
| instances.update(cls._decode_obj_inst(instance)) |
| return {content['id']: instances} |
| |
| def read(self, endpoint: str, path: str): |
| """Send LwM2M READ command and decode the response to a Python dictionary""" |
| resp = self.get(f'/clients/{endpoint}/{path}') |
| if not resp['success']: |
| return resp |
| content = resp['content'] |
| if content['kind'] == 'obj': |
| return self._decode_obj(content) |
| elif content['kind'] == 'instance': |
| return self._decode_obj_inst(content) |
| elif content['kind'] == 'singleResource' or content['kind'] == 'resourceInstance': |
| return self._decode_value(content['type'], content['value']) |
| elif content['kind'] == 'multiResource': |
| return self._decode_resource(content) |
| raise RuntimeError(f'Unhandled type {content["kind"]}') |
| |
| @classmethod |
| def parse_composite(cls, payload: dict): |
| """Decode the Leshan's response to composite query back to a Python dictionary""" |
| data = {} |
| if 'status' in payload: |
| if payload['status'] != 'CONTENT(205)' or 'content' not in payload: |
| raise RuntimeError(f'No content received') |
| payload = payload['content'] |
| for path, content in payload.items(): |
| if path == "/": |
| for obj in content['objects']: |
| data.update(cls._decode_obj(obj)) |
| continue |
| keys = [int(key) for key in path.lstrip("/").split('/')] |
| if len(keys) == 1: |
| data.update(cls._decode_obj(content)) |
| elif len(keys) == 2: |
| if keys[0] not in data: |
| data[keys[0]] = {} |
| data[keys[0]].update(cls._decode_obj_inst(content)) |
| elif len(keys) == 3: |
| if keys[0] not in data: |
| data[keys[0]] = {} |
| if keys[1] not in data[keys[0]]: |
| data[keys[0]][keys[1]] = {} |
| data[keys[0]][keys[1]].update(cls._decode_resource(content)) |
| elif len(keys) == 4: |
| if keys[0] not in data: |
| data[keys[0]] = {} |
| if keys[1] not in data[keys[0]]: |
| data[keys[0]][keys[1]] = {} |
| if keys[2] not in data[keys[0]][keys[1]]: |
| data[keys[0]][keys[1]][keys[2]] = {} |
| data[keys[0]][keys[1]][keys[2]].update(cls._decode_resource(content)) |
| else: |
| raise RuntimeError(f'Unhandled path {path}') |
| return data |
| |
| def _composite_params(self, paths: list[str] | None = None): |
| """Common URI parameters for composite query""" |
| parameters = { |
| 'pathformat': self.format, |
| 'nodeformat': self.format, |
| 'timeout': self.timeout |
| } |
| if paths is not None: |
| paths = [path if path.startswith('/') else '/' + path for path in paths] |
| parameters['paths'] = ','.join(paths) |
| |
| return parameters |
| |
| def composite_read(self, endpoint: str, paths: list[str]): |
| """Send LwM2M Composite-Read command and decode the response to a Python dictionary""" |
| parameters = self._composite_params(paths) |
| resp = self._s.get(f'{self.api_url}/clients/{endpoint}/composite', params=parameters, timeout=self.timeout) |
| payload = Leshan.handle_response(resp) |
| return self.parse_composite(payload) |
| |
| def composite_write(self, endpoint: str, resources: dict): |
| """ |
| Send LwM2m Composite-Write operation. |
| |
| Targeted resources are defined as a dictionary with the following structure: |
| { |
| "/1/0/1": 60, |
| "/1/0/6": True, |
| "/16/0/0": { |
| "0": "aa", |
| "1": "bb", |
| "2": "cc", |
| "3": "dd" |
| } |
| } |
| |
| Objects or object instances cannot be targeted. |
| """ |
| data = { } |
| parameters = self._composite_params() |
| for path, value in resources.items(): |
| path = path if path.startswith('/') else '/' + path |
| level = len(path.split('/')) - 1 |
| rid = int(path.split('/')[-1]) |
| if level == 3: |
| if isinstance(value, dict): |
| value = self._define_resource(rid, value, kind='multiResource') |
| else: |
| value = self._define_resource(rid, value) |
| elif level == 4: |
| value = self._define_resource(rid, value, kind='resourceInstance') |
| else: |
| raise RuntimeError(f'Unhandled path {path}') |
| data[path] = value |
| |
| resp = self._s.put(f'{self.api_url}/clients/{endpoint}/composite', params=parameters, json=data, timeout=self.timeout) |
| return Leshan.handle_response(resp) |
| |
| def discover(self, endpoint: str, path: str): |
| resp = self.handle_response(self._s.get(f'{self.api_url}/clients/{endpoint}/{path}/discover', timeout=self.timeout)) |
| data = {} |
| for obj in resp['objectLinks']: |
| data[obj['url']] = obj['attributes'] |
| return data |
| |
| def create_psk_device(self, endpoint: str, passwd: str): |
| psk = binascii.b2a_hex(passwd.encode()).decode() |
| self.put('/security/clients/', f'{{"endpoint":"{endpoint}","tls":{{"mode":"psk","details":{{"identity":"{endpoint}","key":"{psk}"}} }} }}') |
| |
| def delete_device(self, endpoint: str): |
| self.delete_raw(f'/security/clients/{endpoint}') |
| |
| def create_bs_device(self, endpoint: str, server_uri: str, bs_passwd: str, passwd: str): |
| psk = binascii.b2a_hex(bs_passwd.encode()).decode() |
| data = f'{{"tls":{{"mode":"psk","details":{{"identity":"{endpoint}","key":"{psk}"}}}},"endpoint":"{endpoint}"}}' |
| self.put('/security/clients/', data) |
| ep = str([ord(n) for n in endpoint]) |
| key = str([ord(n) for n in passwd]) |
| content = '{"servers":{"0":{"binding":"U","defaultMinPeriod":1,"lifetime":86400,"notifIfDisabled":false,"shortId":1}},"security":{"1":{"bootstrapServer":false,"clientOldOffTime":1,"publicKeyOrId":' + ep + ',"secretKey":' + key + ',"securityMode":"PSK","serverId":1,"serverSmsNumber":"","smsBindingKeyParam":[],"smsBindingKeySecret":[],"smsSecurityMode":"NO_SEC","uri":"'+server_uri+'"}},"oscore":{},"toDelete":["/0","/1"]}' |
| self.post(f'/bootstrap/{endpoint}', content) |
| |
| def delete_bs_device(self, endpoint: str): |
| self.delete_raw(f'/security/clients/{endpoint}') |
| self.delete_raw(f'/bootstrap/{endpoint}') |
| |
| def observe(self, endpoint: str, path: str): |
| return self.post(f'/clients/{endpoint}/{path}/observe', data="") |
| |
| def cancel_observe(self, endpoint: str, path: str): |
| return self.delete_raw(f'/clients/{endpoint}/{path}/observe?active') |
| |
| def passive_cancel_observe(self, endpoint: str, path: str): |
| return self.delete_raw(f'/clients/{endpoint}/{path}/observe') |
| |
| def composite_observe(self, endpoint: str, paths: list[str]): |
| parameters = self._composite_params(paths) |
| resp = self._s.post(f'{self.api_url}/clients/{endpoint}/composite/observe', params=parameters, timeout=self.timeout) |
| payload = Leshan.handle_response(resp) |
| return self.parse_composite(payload) |
| |
| def cancel_composite_observe(self, endpoint: str, paths: list[str]): |
| paths = [path if path.startswith('/') else '/' + path for path in paths] |
| return self.delete_raw(f'/clients/{endpoint}/composite/observe?paths=' + ','.join(paths) + '&active') |
| |
| def passive_cancel_composite_observe(self, endpoint: str, paths: list[str]): |
| paths = [path if path.startswith('/') else '/' + path for path in paths] |
| return self.delete_raw(f'/clients/{endpoint}/composite/observe?paths=' + ','.join(paths)) |
| |
| @contextmanager |
| def get_event_stream(self, endpoint: str, timeout: int = None): |
| """ |
| Get stream of events regarding the given endpoint. |
| |
| Events are notifications, updates and sends. |
| |
| The event stream must be closed after the use, so this must be used in 'with' statement like this: |
| with leshan.get_event_stream('native_posix') as events: |
| data = events.next_event('SEND') |
| |
| If timeout happens, the event streams returns None. |
| """ |
| if timeout is None: |
| timeout = self.timeout |
| r = requests.get(f'{self.api_url}/event?{endpoint}', stream=True, headers={'Accept': 'text/event-stream'}, timeout=timeout) |
| if r.encoding is None: |
| r.encoding = 'utf-8' |
| try: |
| yield LeshanEventsIterator(r, timeout) |
| finally: |
| r.close() |
| |
| class LeshanEventsIterator: |
| """Iterator for Leshan event stream""" |
| def __init__(self, req: requests.Response, timeout: int): |
| """Initialize the iterator in line mode""" |
| self._it = req.iter_lines(chunk_size=1, decode_unicode=True) |
| self._timeout = timeout |
| |
| def next_event(self, event: str): |
| """ |
| Finds the next occurrence of a specific event in the stream. |
| |
| If timeout happens, the returns None. |
| """ |
| timeout = time.time() + self._timeout |
| try: |
| for line in self._it: |
| if line == f'event: {event}': |
| for line in self._it: |
| if not line.startswith('data: '): |
| continue |
| data = json.loads(line.lstrip('data: ')) |
| if event == 'SEND' or (event == 'NOTIFICATION' and data['kind'] == 'composite'): |
| return Leshan.parse_composite(data['val']) |
| if event == 'NOTIFICATION': |
| d = {data['res']: data['val']} |
| return Leshan.parse_composite(d) |
| return data |
| if time.time() > timeout: |
| return None |
| except requests.exceptions.Timeout: |
| return None |