| #!/usr/bin/env python3 |
| # Copyright 2021 The Pigweed Authors |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); you may not |
| # use this file except in compliance with the License. You may obtain a copy of |
| # the License at |
| # |
| # https://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
| # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
| # License for the specific language governing permissions and limitations under |
| # the License. |
| """Tests for the thread analyzer.""" |
| |
| import unittest |
| from pw_thread.thread_analyzer import ThreadInfo, ThreadSnapshotAnalyzer |
| from pw_thread_protos import thread_pb2 |
| |
| |
| class ThreadInfoTest(unittest.TestCase): |
| """Tests that the ThreadInfo class produces expected results.""" |
| def test_empty_thread(self): |
| thread_info = ThreadInfo(thread_pb2.Thread()) |
| expected = '\n'.join( |
| ('Est CPU usage: unknown', 'Stack info', |
| ' Current usage: 0x???????? - 0x???????? (size unknown)', |
| ' Est peak usage: size unknown', |
| ' Stack limits: 0x???????? - 0x???????? (size unknown)')) |
| self.assertFalse(thread_info.has_stack_size_limit()) |
| self.assertFalse(thread_info.has_stack_used()) |
| self.assertEqual(expected, str(thread_info)) |
| |
| def test_thread_with_cpu_usage(self): |
| thread = thread_pb2.Thread() |
| thread.cpu_usage_hundredths = 1234 |
| thread_info = ThreadInfo(thread) |
| |
| expected = '\n'.join( |
| ('Est CPU usage: 12.34%', 'Stack info', |
| ' Current usage: 0x???????? - 0x???????? (size unknown)', |
| ' Est peak usage: size unknown', |
| ' Stack limits: 0x???????? - 0x???????? (size unknown)')) |
| self.assertFalse(thread_info.has_stack_size_limit()) |
| self.assertFalse(thread_info.has_stack_used()) |
| self.assertEqual(expected, str(thread_info)) |
| |
| def test_thread_with_stack_pointer(self): |
| thread = thread_pb2.Thread() |
| thread.stack_pointer = 0x5AC6A86C |
| thread_info = ThreadInfo(thread) |
| |
| expected = '\n'.join( |
| ('Est CPU usage: unknown', 'Stack info', |
| ' Current usage: 0x???????? - 0x5ac6a86c (size unknown)', |
| ' Est peak usage: size unknown', |
| ' Stack limits: 0x???????? - 0x???????? (size unknown)')) |
| self.assertFalse(thread_info.has_stack_size_limit()) |
| self.assertFalse(thread_info.has_stack_used()) |
| self.assertEqual(expected, str(thread_info)) |
| |
| def test_thread_with_stack_usage(self): |
| thread = thread_pb2.Thread() |
| thread.stack_start_pointer = 0x5AC6B86C |
| thread.stack_pointer = 0x5AC6A86C |
| thread_info = ThreadInfo(thread) |
| |
| expected = '\n'.join( |
| ('Est CPU usage: unknown', 'Stack info', |
| ' Current usage: 0x5ac6b86c - 0x5ac6a86c (4096 bytes)', |
| ' Est peak usage: size unknown', |
| ' Stack limits: 0x5ac6b86c - 0x???????? (size unknown)')) |
| self.assertFalse(thread_info.has_stack_size_limit()) |
| self.assertTrue(thread_info.has_stack_used()) |
| self.assertEqual(expected, str(thread_info)) |
| |
| def test_thread_with_all_stack_info(self): |
| thread = thread_pb2.Thread() |
| thread.stack_start_pointer = 0x5AC6B86C |
| thread.stack_end_pointer = 0x5AC6986C |
| thread.stack_pointer = 0x5AC6A86C |
| thread_info = ThreadInfo(thread) |
| |
| expected = '\n'.join( |
| ('Est CPU usage: unknown', 'Stack info', |
| ' Current usage: 0x5ac6b86c - 0x5ac6a86c (4096 bytes, 50.00%)', |
| ' Est peak usage: size unknown', |
| ' Stack limits: 0x5ac6b86c - 0x5ac6986c (8192 bytes)')) |
| self.assertTrue(thread_info.has_stack_size_limit()) |
| self.assertTrue(thread_info.has_stack_used()) |
| self.assertEqual(expected, str(thread_info)) |
| |
| |
| class ThreadSnapshotAnalyzerTest(unittest.TestCase): |
| """Tests that the ThreadSnapshotAnalyzer class produces expected results.""" |
| def test_no_threads(self): |
| analyzer = ThreadSnapshotAnalyzer(thread_pb2.SnapshotThreadInfo()) |
| self.assertEqual('', str(analyzer)) |
| |
| def test_one_empty_thread(self): |
| snapshot = thread_pb2.SnapshotThreadInfo() |
| snapshot.threads.append(thread_pb2.Thread()) |
| expected = '\n'.join(( |
| 'Thread State', |
| ' 1 thread running.', |
| '', |
| 'Thread (UNKNOWN): [unnamed thread]', |
| 'Est CPU usage: unknown', |
| 'Stack info', |
| ' Current usage: 0x???????? - 0x???????? (size unknown)', |
| ' Est peak usage: size unknown', |
| ' Stack limits: 0x???????? - 0x???????? (size unknown)', |
| '', |
| )) |
| analyzer = ThreadSnapshotAnalyzer(snapshot) |
| self.assertEqual(analyzer.active_thread(), None) |
| self.assertEqual(str(ThreadSnapshotAnalyzer(snapshot)), expected) |
| |
| def test_two_threads(self): |
| """Ensures multiple threads are printed correctly.""" |
| snapshot = thread_pb2.SnapshotThreadInfo() |
| |
| temp_thread = thread_pb2.Thread() |
| temp_thread.name = 'Idle'.encode() |
| temp_thread.state = thread_pb2.ThreadState.Enum.READY |
| temp_thread.stack_start_pointer = 0x2001ac00 |
| temp_thread.stack_end_pointer = 0x2001aa00 |
| temp_thread.stack_pointer = 0x2001ab0c |
| temp_thread.stack_pointer_est_peak = 0x2001aa00 |
| snapshot.threads.append(temp_thread) |
| |
| temp_thread = thread_pb2.Thread() |
| temp_thread.name = 'Alice'.encode() |
| temp_thread.stack_start_pointer = 0x2001b000 |
| temp_thread.stack_pointer = 0x2001ae20 |
| temp_thread.state = thread_pb2.ThreadState.Enum.BLOCKED |
| snapshot.threads.append(temp_thread) |
| |
| expected = '\n'.join(( |
| 'Thread State', |
| ' 2 threads running.', |
| '', |
| 'Thread (READY): Idle', |
| 'Est CPU usage: unknown', |
| 'Stack info', |
| ' Current usage: 0x2001ac00 - 0x2001ab0c (244 bytes, 47.66%)', |
| ' Est peak usage: 512 bytes, 100.00%', |
| ' Stack limits: 0x2001ac00 - 0x2001aa00 (512 bytes)', |
| '', |
| 'Thread (BLOCKED): Alice', |
| 'Est CPU usage: unknown', |
| 'Stack info', |
| ' Current usage: 0x2001b000 - 0x2001ae20 (480 bytes)', |
| ' Est peak usage: size unknown', |
| ' Stack limits: 0x2001b000 - 0x???????? (size unknown)', |
| '', |
| )) |
| analyzer = ThreadSnapshotAnalyzer(snapshot) |
| self.assertEqual(analyzer.active_thread(), None) |
| self.assertEqual(str(ThreadSnapshotAnalyzer(snapshot)), expected) |
| |
| def test_interrupts_with_thread(self): |
| """Ensures interrupts are properly reported as active.""" |
| snapshot = thread_pb2.SnapshotThreadInfo() |
| |
| temp_thread = thread_pb2.Thread() |
| temp_thread.name = 'Idle'.encode() |
| temp_thread.state = thread_pb2.ThreadState.Enum.READY |
| temp_thread.stack_start_pointer = 0x2001ac00 |
| temp_thread.stack_end_pointer = 0x2001aa00 |
| temp_thread.stack_pointer = 0x2001ab0c |
| temp_thread.stack_pointer_est_peak = 0x2001aa00 |
| snapshot.threads.append(temp_thread) |
| |
| temp_thread = thread_pb2.Thread() |
| temp_thread.name = 'Main/Handler'.encode() |
| temp_thread.stack_start_pointer = 0x2001b000 |
| temp_thread.stack_pointer = 0x2001ae20 |
| temp_thread.state = thread_pb2.ThreadState.Enum.INTERRUPT_HANDLER |
| snapshot.threads.append(temp_thread) |
| |
| expected = '\n'.join(( |
| 'Thread State', |
| ' 2 threads running, Main/Handler active at the time of capture.', |
| ' ~~~~~~~~~~~~', |
| '', |
| # Ensure the active thread is moved to the top of the list. |
| 'Thread (INTERRUPT_HANDLER): Main/Handler <-- [ACTIVE]', |
| 'Est CPU usage: unknown', |
| 'Stack info', |
| ' Current usage: 0x2001b000 - 0x2001ae20 (480 bytes)', |
| ' Est peak usage: size unknown', |
| ' Stack limits: 0x2001b000 - 0x???????? (size unknown)', |
| '', |
| 'Thread (READY): Idle', |
| 'Est CPU usage: unknown', |
| 'Stack info', |
| ' Current usage: 0x2001ac00 - 0x2001ab0c (244 bytes, 47.66%)', |
| ' Est peak usage: 512 bytes, 100.00%', |
| ' Stack limits: 0x2001ac00 - 0x2001aa00 (512 bytes)', |
| '', |
| )) |
| analyzer = ThreadSnapshotAnalyzer(snapshot) |
| self.assertEqual(analyzer.active_thread(), temp_thread) |
| self.assertEqual(str(ThreadSnapshotAnalyzer(snapshot)), expected) |
| |
| def test_active_thread(self): |
| """Ensures the 'active' thread is highlighted.""" |
| snapshot = thread_pb2.SnapshotThreadInfo() |
| |
| temp_thread = thread_pb2.Thread() |
| temp_thread.name = 'Idle'.encode() |
| temp_thread.state = thread_pb2.ThreadState.Enum.READY |
| temp_thread.stack_start_pointer = 0x2001ac00 |
| temp_thread.stack_end_pointer = 0x2001aa00 |
| temp_thread.stack_pointer = 0x2001ab0c |
| temp_thread.stack_pointer_est_peak = 0x2001ac00 + 0x100 |
| snapshot.threads.append(temp_thread) |
| |
| temp_thread = thread_pb2.Thread() |
| temp_thread.name = 'Main/Handler'.encode() |
| temp_thread.active = True |
| temp_thread.stack_start_pointer = 0x2001b000 |
| temp_thread.stack_pointer = 0x2001ae20 |
| temp_thread.stack_pointer_est_peak = 0x2001b000 + 0x200 |
| temp_thread.state = thread_pb2.ThreadState.Enum.INTERRUPT_HANDLER |
| snapshot.threads.append(temp_thread) |
| |
| expected = '\n'.join(( |
| 'Thread State', |
| ' 2 threads running, Main/Handler active at the time of capture.', |
| ' ~~~~~~~~~~~~', |
| '', |
| # Ensure the active thread is moved to the top of the list. |
| 'Thread (INTERRUPT_HANDLER): Main/Handler <-- [ACTIVE]', |
| 'Est CPU usage: unknown', |
| 'Stack info', |
| ' Current usage: 0x2001b000 - 0x2001ae20 (480 bytes)', |
| ' Est peak usage: 512 bytes', |
| ' Stack limits: 0x2001b000 - 0x???????? (size unknown)', |
| '', |
| 'Thread (READY): Idle', |
| 'Est CPU usage: unknown', |
| 'Stack info', |
| ' Current usage: 0x2001ac00 - 0x2001ab0c (244 bytes, 47.66%)', |
| ' Est peak usage: 256 bytes, 50.00%', |
| ' Stack limits: 0x2001ac00 - 0x2001aa00 (512 bytes)', |
| '', |
| )) |
| analyzer = ThreadSnapshotAnalyzer(snapshot) |
| |
| # Ensure the active thread is found. |
| self.assertEqual(analyzer.active_thread(), temp_thread) |
| self.assertEqual(str(ThreadSnapshotAnalyzer(snapshot)), expected) |
| |
| |
| if __name__ == '__main__': |
| unittest.main() |