blob: 54b057318baeda0b7edb7db428b7a33d180583ba [file] [log] [blame]
# Copyright 2022 The Pigweed Authors
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy of
# the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations under
# the License.
"""pw_console test mode functions."""
import asyncio
import time
import re
import random
import logging
from threading import Thread
from typing import Dict, List, Tuple
FAKE_DEVICE_LOGGER_NAME = 'pw_console_fake_device'
_ROOT_LOG = logging.getLogger('')
_FAKE_DEVICE_LOG = logging.getLogger(FAKE_DEVICE_LOGGER_NAME)
def start_fake_logger(lines, log_thread_entry, log_thread_loop):
fake_log_messages = prepare_fake_logs(lines)
test_log_thread = Thread(target=log_thread_entry, args=(), daemon=True)
test_log_thread.start()
background_log_task = asyncio.run_coroutine_threadsafe(
# This function will be executed in a separate thread.
log_forever(fake_log_messages),
# Using this asyncio event loop.
log_thread_loop) # type: ignore
return background_log_task
def prepare_fake_logs(lines) -> List[Tuple[str, Dict]]:
fake_logs: List[Tuple[str, Dict]] = []
key_regex = re.compile(r':kbd:`(?P<key>[^`]+)`')
for line in lines:
if not line:
continue
keyboard_key = ''
search = key_regex.search(line)
if search:
keyboard_key = search.group(1)
fake_logs.append((line, {'keys': keyboard_key}))
return fake_logs
async def log_forever(fake_log_messages: List[Tuple[str, Dict]]):
"""Test mode async log generator coroutine that runs forever."""
_ROOT_LOG.info('Fake log device connected.')
start_time = time.time()
message_count = 0
# Fake module column names.
module_names = ['APP', 'RADIO', 'BAT', 'USB', 'CPU']
while True:
if message_count > 32 or message_count < 2:
await asyncio.sleep(.1)
fake_log = random.choice(fake_log_messages)
module_name = module_names[message_count % len(module_names)]
_FAKE_DEVICE_LOG.info(
fake_log[0],
extra=dict(extra_metadata_fields=dict(module=module_name,
file='fake_app.cc',
timestamp=time.time() -
start_time,
**fake_log[1])))
message_count += 1