blob: 37a3495706b69c6c5964e298c3374b5f1b3e891e [file] [log] [blame]
# Copyright 2021 The Pigweed Authors
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy of
# the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations under
# the License.
"""Library to analyze and dump Thread protos and Thread snapshots into text."""
from typing import Optional, List, Mapping
import pw_tokenizer
from pw_symbolizer import LlvmSymbolizer, Symbolizer
from pw_tokenizer import proto as proto_detokenizer
from pw_thread_protos import thread_pb2
THREAD_STATE_TO_STRING: Mapping[int, str] = {
thread_pb2.ThreadState.Enum.UNKNOWN: 'UNKNOWN',
thread_pb2.ThreadState.Enum.INTERRUPT_HANDLER: 'INTERRUPT_HANDLER',
thread_pb2.ThreadState.Enum.RUNNING: 'RUNNING',
thread_pb2.ThreadState.Enum.READY: 'READY',
thread_pb2.ThreadState.Enum.SUSPENDED: 'SUSPENDED',
thread_pb2.ThreadState.Enum.BLOCKED: 'BLOCKED',
thread_pb2.ThreadState.Enum.INACTIVE: 'INACTIVE',
}
def process_snapshot(serialized_snapshot: bytes,
tokenizer_db: Optional[pw_tokenizer.Detokenizer] = None,
symbolizer: Optional[Symbolizer] = None) -> str:
"""Processes snapshot threads, producing a multi-line string."""
captured_threads = thread_pb2.SnapshotThreadInfo()
captured_threads.ParseFromString(serialized_snapshot)
if symbolizer is None:
symbolizer = LlvmSymbolizer()
return str(
ThreadSnapshotAnalyzer(captured_threads, tokenizer_db, symbolizer))
class ThreadInfo:
"""Provides CPU and stack information that can be inferred from a Thread."""
_UNKNOWN_VALUE_STR = '0x' + '?' * 8
def __init__(self, thread: thread_pb2.Thread):
self._thread = thread
def _cpu_used_str(self) -> str:
if not self._thread.HasField('cpu_usage_hundredths'):
return 'unknown'
cpu_last_percent = (self._thread.cpu_usage_hundredths / 100)
return f'{cpu_last_percent:.2f}%'
def _stack_size_limit_limit_str(self) -> str:
if not self.has_stack_size_limit():
return 'size unknown'
return f'{self.stack_size_limit()} bytes'
def _stack_used_str(self) -> str:
if not self.has_stack_used():
return 'size unknown'
used_str = f'{self.stack_used()} bytes'
if not self.has_stack_size_limit():
return used_str
used_str += f', {100*self.stack_used()/self.stack_size_limit():.2f}%'
return used_str
def _stack_pointer_est_peak_str(self) -> str:
if not self.has_stack_pointer_est_peak():
return 'size unknown'
high_used_str = f'{self.stack_pointer_est_peak()} bytes'
if not self.has_stack_size_limit():
return high_used_str
high_water_mark_percent = (100 * self.stack_pointer_est_peak() /
self.stack_size_limit())
high_used_str += f', {high_water_mark_percent:.2f}%'
return high_used_str
def _stack_used_range_str(self) -> str:
start_str = (f'0x{self._thread.stack_start_pointer:08x}'
if self._thread.HasField('stack_start_pointer') else
ThreadInfo._UNKNOWN_VALUE_STR)
end_str = (f'0x{self._thread.stack_pointer:08x}'
if self._thread.HasField('stack_pointer') else
ThreadInfo._UNKNOWN_VALUE_STR)
# TODO(amontanez): Would be nice to represent stack growth direction.
return f'{start_str} - {end_str} ({self._stack_used_str()})'
def _stack_limit_range_str(self) -> str:
start_str = (f'0x{self._thread.stack_start_pointer:08x}'
if self._thread.HasField('stack_start_pointer') else
ThreadInfo._UNKNOWN_VALUE_STR)
end_str = (f'0x{self._thread.stack_end_pointer:08x}'
if self._thread.HasField('stack_end_pointer') else
ThreadInfo._UNKNOWN_VALUE_STR)
# TODO(amontanez): Would be nice to represent stack growth direction.
return f'{start_str} - {end_str} ({self._stack_size_limit_limit_str()})'
def _stack_pointer_str(self) -> str:
return (f'0x{self._thread.stack_end_pointer:08x}'
if self._thread.HasField('stack_pointer') else
ThreadInfo._UNKNOWN_VALUE_STR)
def has_stack_size_limit(self) -> bool:
"""Returns true if there's enough info to calculate stack size."""
return (self._thread.HasField('stack_start_pointer')
and self._thread.HasField('stack_end_pointer'))
def stack_size_limit(self) -> int:
"""Returns the stack size limit in bytes.
Precondition:
has_stack_size_limit() must be true.
"""
assert self.has_stack_size_limit(), 'Missing stack size information'
return abs(self._thread.stack_start_pointer -
self._thread.stack_end_pointer)
def has_stack_used(self) -> bool:
"""Returns true if there's enough info to calculate stack usage."""
return (self._thread.HasField('stack_start_pointer')
and self._thread.HasField('stack_pointer'))
def stack_used(self) -> int:
"""Returns the stack usage in bytes.
Precondition:
has_stack_used() must be true.
"""
assert self.has_stack_used(), 'Missing stack usage information'
return abs(self._thread.stack_start_pointer -
self._thread.stack_pointer)
def has_stack_pointer_est_peak(self) -> bool:
"""Returns true if there's enough info to calculate estimate
used stack.
"""
return (self._thread.HasField('stack_start_pointer')
and self._thread.HasField('stack_pointer_est_peak'))
def stack_pointer_est_peak(self) -> int:
"""Returns the max estimated used stack usage in bytes.
Precondition:
has_stack_estimated_used_bytes() must be true.
"""
assert self.has_stack_pointer_est_peak(), 'Missing stack est. peak'
return abs(self._thread.stack_start_pointer -
self._thread.stack_pointer_est_peak)
def __str__(self) -> str:
output = [
f'Est CPU usage: {self._cpu_used_str()}',
'Stack info',
f' Current usage: {self._stack_used_range_str()}',
f' Est peak usage: {self._stack_pointer_est_peak_str()}',
f' Stack limits: {self._stack_limit_range_str()}',
]
return '\n'.join(output)
class ThreadSnapshotAnalyzer:
"""This class simplifies dumping contents of a snapshot Metadata message."""
def __init__(self,
threads: thread_pb2.SnapshotThreadInfo,
tokenizer_db: Optional[pw_tokenizer.Detokenizer] = None,
symbolizer: Optional[Symbolizer] = None):
self._threads = threads.threads
self._tokenizer_db = (tokenizer_db if tokenizer_db is not None else
pw_tokenizer.Detokenizer(None))
if symbolizer is not None:
self._symbolizer = symbolizer
else:
self._symbolizer = LlvmSymbolizer()
for thread in self._threads:
proto_detokenizer.detokenize_fields(self._tokenizer_db, thread)
def active_thread(self) -> Optional[thread_pb2.Thread]:
"""The thread that requested the snapshot capture."""
# First check if an interrupt handler was active.
for thread in self._threads:
if thread.state == thread_pb2.ThreadState.Enum.INTERRUPT_HANDLER:
return thread
if thread.active: # The deprecated legacy way to report this.
return thread
# If not, search for a running thread.
for thread in self._threads:
if thread.state == thread_pb2.ThreadState.Enum.RUNNING:
return thread
return None
def __str__(self) -> str:
"""outputs a pw.snapshot.Metadata proto as a multi-line string."""
output: List[str] = []
if not self._threads:
return ''
output.append('Thread State')
plural = '' if len(self._threads) == 1 else 's'
thread_state_overview = f' {len(self._threads)} thread{plural} running'
requesting_thread = self.active_thread()
if not requesting_thread:
thread_state_overview += '.'
output.append(thread_state_overview)
else:
thread_state_overview += ', '
underline = (' ' * len(thread_state_overview) +
'~' * len(requesting_thread.name.decode()))
thread_state_overview += (f'{requesting_thread.name.decode()}'
' active at the time of capture.')
output.append(thread_state_overview)
output.append(underline)
output.append('')
# Put the active thread at the front.
requesting_thread = self.active_thread()
if requesting_thread is not None:
self._threads.remove(requesting_thread)
self._threads.insert(0, requesting_thread)
for thread in self._threads:
thread_name = thread.name.decode()
if not thread_name:
thread_name = '[unnamed thread]'
thread_headline = ('Thread '
f'({THREAD_STATE_TO_STRING[thread.state]}): '
f'{thread_name}')
if self.active_thread() == thread:
thread_headline += ' <-- [ACTIVE]'
output.append(thread_headline)
output.append(str(ThreadInfo(thread)))
if thread.raw_backtrace:
output.append(
self._symbolizer.dump_stack_trace(thread.raw_backtrace))
# Blank line between threads for nicer formatting.
output.append('')
return '\n'.join(output)