pw_tokenizer: Fix _DatabasePath.__init__()
_DatabasePath.__init__() would previously fail to extract the domain if
passed a Path() object instead of a str. This issue was originally
discovered when attempting to add the #metrics domain for loading
tokenized metrics.
This commit updates test_token_domains() to expose this bug, and applies
the necessary fix.
_DatabasePath.__init__() now handles all possible _PathOrFile arguments
correctly.
Additional clean-up suggested in:
Bug: b/265334753
Change-Id: Ibced5abe15be4f9d4755e94dec5fe260ea6bbd87
Reviewed-on: https://pigweed-review.googlesource.com/c/pigweed/pigweed/+/126462
Commit-Queue: Wyatt Hepler <hepler@google.com>
Reviewed-by: Wyatt Hepler <hepler@google.com>
Pigweed-Auto-Submit: Wyatt Hepler <hepler@google.com>
diff --git a/pw_tokenizer/py/detokenize_test.py b/pw_tokenizer/py/detokenize_test.py
index 6c8c731..df710c7 100755
--- a/pw_tokenizer/py/detokenize_test.py
+++ b/pw_tokenizer/py/detokenize_test.py
@@ -507,7 +507,7 @@
file.close()
detok = detokenize.AutoUpdatingDetokenizer(
- file, min_poll_period_s=0
+ file.name, min_poll_period_s=0
)
self.assertTrue(detok.detokenize(JELLO_WORLD_TOKEN).ok())
@@ -524,21 +524,38 @@
finally:
os.unlink(file.name)
- def test_token_domains(self, _):
- """Tests that token domains can be parsed from input filename"""
- filename_and_domain = f'{ELF_WITH_TOKENIZER_SECTIONS_PATH}#.*'
- detok_with_domain = detokenize.AutoUpdatingDetokenizer(
- filename_and_domain, min_poll_period_s=0
+ def test_token_domain_in_str(self, _) -> None:
+ """Tests a str containing a domain"""
+ detok = detokenize.AutoUpdatingDetokenizer(
+ f'{ELF_WITH_TOKENIZER_SECTIONS_PATH}#.*', min_poll_period_s=0
)
self.assertEqual(
- len(detok_with_domain.database),
- TOKENS_IN_ELF_WITH_TOKENIZER_SECTIONS,
+ len(detok.database), TOKENS_IN_ELF_WITH_TOKENIZER_SECTIONS
)
+
+ def test_token_domain_in_path(self, _) -> None:
+ """Tests a Path() containing a domain"""
+ detok = detokenize.AutoUpdatingDetokenizer(
+ Path(f'{ELF_WITH_TOKENIZER_SECTIONS_PATH}#.*'), min_poll_period_s=0
+ )
+ self.assertEqual(
+ len(detok.database), TOKENS_IN_ELF_WITH_TOKENIZER_SECTIONS
+ )
+
+ def test_token_no_domain_in_str(self, _) -> None:
+ """Tests a str without a domain"""
detok = detokenize.AutoUpdatingDetokenizer(
str(ELF_WITH_TOKENIZER_SECTIONS_PATH), min_poll_period_s=0
)
self.assertEqual(len(detok.database), TOKENS_IN_ELF)
+ def test_token_no_domain_in_path(self, _) -> None:
+ """Tests a Path() without a domain"""
+ detok = detokenize.AutoUpdatingDetokenizer(
+ ELF_WITH_TOKENIZER_SECTIONS_PATH, min_poll_period_s=0
+ )
+ self.assertEqual(len(detok.database), TOKENS_IN_ELF)
+
def _next_char(message: bytes) -> bytes:
return bytes(b + 1 for b in message)
diff --git a/pw_tokenizer/py/pw_tokenizer/detokenize.py b/pw_tokenizer/py/pw_tokenizer/detokenize.py
index 29d08c1..fa60212 100755
--- a/pw_tokenizer/py/pw_tokenizer/detokenize.py
+++ b/pw_tokenizer/py/pw_tokenizer/detokenize.py
@@ -50,7 +50,6 @@
Dict,
List,
Iterable,
- IO,
Iterator,
Match,
NamedTuple,
@@ -319,7 +318,27 @@
return decode_and_detokenize
-_PathOrFile = Union[IO, str, Path]
+_PathOrStr = Union[Path, str]
+
+# TODO(b/265334753): Reuse this function in database.py:LoadTokenDatabases
+def _parse_domain(path: _PathOrStr) -> Tuple[Path, Optional[Pattern[str]]]:
+ """Extracts an optional domain regex pattern suffix from a path"""
+
+ if isinstance(path, Path):
+ path = str(path)
+
+ delimiters = path.count('#')
+
+ if delimiters == 0:
+ return Path(path), None
+
+ if delimiters == 1:
+ path, domain = path.split('#')
+ return Path(path), re.compile(domain)
+
+ raise ValueError(
+ f'Too many # delimiters. Expected 0 or 1, found {delimiters}'
+ )
class AutoUpdatingDetokenizer(Detokenizer):
@@ -328,18 +347,8 @@
class _DatabasePath:
"""Tracks the modified time of a path or file object."""
- def __init__(self, path: _PathOrFile) -> None:
- self.path: Path
- self.domain = None
- if isinstance(path, str):
- if path.count('#') == 1:
- path, domain = path.split('#')
- self.domain = re.compile(domain)
- self.path = Path(path)
- elif isinstance(path, Path):
- self.path = path
- else:
- self.path = Path(path.name)
+ def __init__(self, path: _PathOrStr) -> None:
+ self.path, self.domain = _parse_domain(path)
self._modified_time: Optional[float] = self._last_modified_time()
def updated(self) -> bool:
@@ -368,7 +377,7 @@
return database.load_token_database()
def __init__(
- self, *paths_or_files: _PathOrFile, min_poll_period_s: float = 1.0
+ self, *paths_or_files: _PathOrStr, min_poll_period_s: float = 1.0
) -> None:
self.paths = tuple(self._DatabasePath(path) for path in paths_or_files)
self.min_poll_period_s = min_poll_period_s