# Copyright 2021 The Pigweed Authors
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy of
# the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations under
# the License.
"""Library to analyze and dump Thread protos and Thread snapshots into text."""

from typing import Optional, List, Mapping
import pw_tokenizer
from pw_symbolizer import LlvmSymbolizer, Symbolizer
from pw_tokenizer import proto as proto_detokenizer
from pw_thread_protos import thread_pb2

THREAD_STATE_TO_STRING: Mapping[int, str] = {
    thread_pb2.ThreadState.Enum.UNKNOWN: 'UNKNOWN',
    thread_pb2.ThreadState.Enum.INTERRUPT_HANDLER: 'INTERRUPT_HANDLER',
    thread_pb2.ThreadState.Enum.RUNNING: 'RUNNING',
    thread_pb2.ThreadState.Enum.READY: 'READY',
    thread_pb2.ThreadState.Enum.SUSPENDED: 'SUSPENDED',
    thread_pb2.ThreadState.Enum.BLOCKED: 'BLOCKED',
    thread_pb2.ThreadState.Enum.INACTIVE: 'INACTIVE',
}


def process_snapshot(serialized_snapshot: bytes,
                     tokenizer_db: Optional[pw_tokenizer.Detokenizer] = None,
                     symbolizer: Optional[Symbolizer] = None) -> str:
    """Processes snapshot threads, producing a multi-line string."""
    captured_threads = thread_pb2.SnapshotThreadInfo()
    captured_threads.ParseFromString(serialized_snapshot)
    if symbolizer is None:
        symbolizer = LlvmSymbolizer()

    return str(
        ThreadSnapshotAnalyzer(captured_threads, tokenizer_db, symbolizer))


class ThreadInfo:
    """Provides CPU and stack information that can be inferred from a Thread."""

    _UNKNOWN_VALUE_STR = '0x' + '?' * 8

    def __init__(self, thread: thread_pb2.Thread):
        self._thread = thread

    def _cpu_used_str(self) -> str:
        if not self._thread.HasField('cpu_usage_hundredths'):
            return 'unknown'
        cpu_last_percent = (self._thread.cpu_usage_hundredths / 100)
        return f'{cpu_last_percent:.2f}%'

    def _stack_size_limit_limit_str(self) -> str:
        if not self.has_stack_size_limit():
            return 'size unknown'

        return f'{self.stack_size_limit()} bytes'

    def _stack_used_str(self) -> str:
        if not self.has_stack_used():
            return 'size unknown'

        used_str = f'{self.stack_used()} bytes'
        if not self.has_stack_size_limit():
            return used_str
        used_str += f', {100*self.stack_used()/self.stack_size_limit():.2f}%'
        return used_str

    def _stack_pointer_est_peak_str(self) -> str:
        if not self.has_stack_pointer_est_peak():
            return 'size unknown'

        high_used_str = f'{self.stack_pointer_est_peak()} bytes'
        if not self.has_stack_size_limit():
            return high_used_str
        high_water_mark_percent = (100 * self.stack_pointer_est_peak() /
                                   self.stack_size_limit())
        high_used_str += f', {high_water_mark_percent:.2f}%'
        return high_used_str

    def _stack_used_range_str(self) -> str:
        start_str = (f'0x{self._thread.stack_start_pointer:08x}'
                     if self._thread.HasField('stack_start_pointer') else
                     ThreadInfo._UNKNOWN_VALUE_STR)
        end_str = (f'0x{self._thread.stack_pointer:08x}'
                   if self._thread.HasField('stack_pointer') else
                   ThreadInfo._UNKNOWN_VALUE_STR)

        # TODO(amontanez): Would be nice to represent stack growth direction.
        return f'{start_str} - {end_str} ({self._stack_used_str()})'

    def _stack_limit_range_str(self) -> str:
        start_str = (f'0x{self._thread.stack_start_pointer:08x}'
                     if self._thread.HasField('stack_start_pointer') else
                     ThreadInfo._UNKNOWN_VALUE_STR)
        end_str = (f'0x{self._thread.stack_end_pointer:08x}'
                   if self._thread.HasField('stack_end_pointer') else
                   ThreadInfo._UNKNOWN_VALUE_STR)

        # TODO(amontanez): Would be nice to represent stack growth direction.
        return f'{start_str} - {end_str} ({self._stack_size_limit_limit_str()})'

    def _stack_pointer_str(self) -> str:
        return (f'0x{self._thread.stack_end_pointer:08x}'
                if self._thread.HasField('stack_pointer') else
                ThreadInfo._UNKNOWN_VALUE_STR)

    def has_stack_size_limit(self) -> bool:
        """Returns true if there's enough info to calculate stack size."""
        return (self._thread.HasField('stack_start_pointer')
                and self._thread.HasField('stack_end_pointer'))

    def stack_size_limit(self) -> int:
        """Returns the stack size limit in bytes.

        Precondition:
            has_stack_size_limit() must be true.
        """
        assert self.has_stack_size_limit(), 'Missing stack size information'
        return abs(self._thread.stack_start_pointer -
                   self._thread.stack_end_pointer)

    def has_stack_used(self) -> bool:
        """Returns true if there's enough info to calculate stack usage."""
        return (self._thread.HasField('stack_start_pointer')
                and self._thread.HasField('stack_pointer'))

    def stack_used(self) -> int:
        """Returns the stack usage in bytes.

        Precondition:
            has_stack_used() must be true.
        """
        assert self.has_stack_used(), 'Missing stack usage information'
        return abs(self._thread.stack_start_pointer -
                   self._thread.stack_pointer)

    def has_stack_pointer_est_peak(self) -> bool:
        """Returns true if there's enough info to calculate estimate
        used stack.
        """
        return (self._thread.HasField('stack_start_pointer')
                and self._thread.HasField('stack_pointer_est_peak'))

    def stack_pointer_est_peak(self) -> int:
        """Returns the max estimated used stack usage in bytes.

        Precondition:
            has_stack_estimated_used_bytes() must be true.
        """
        assert self.has_stack_pointer_est_peak(), 'Missing stack est. peak'
        return abs(self._thread.stack_start_pointer -
                   self._thread.stack_pointer_est_peak)

    def __str__(self) -> str:
        output = [
            f'Est CPU usage: {self._cpu_used_str()}',
            'Stack info',
            f'  Current usage:   {self._stack_used_range_str()}',
            f'  Est peak usage:  {self._stack_pointer_est_peak_str()}',
            f'  Stack limits:    {self._stack_limit_range_str()}',
        ]
        return '\n'.join(output)


class ThreadSnapshotAnalyzer:
    """This class simplifies dumping contents of a snapshot Metadata message."""
    def __init__(self,
                 threads: thread_pb2.SnapshotThreadInfo,
                 tokenizer_db: Optional[pw_tokenizer.Detokenizer] = None,
                 symbolizer: Optional[Symbolizer] = None):
        self._threads = threads.threads
        self._tokenizer_db = (tokenizer_db if tokenizer_db is not None else
                              pw_tokenizer.Detokenizer(None))
        if symbolizer is not None:
            self._symbolizer = symbolizer
        else:
            self._symbolizer = LlvmSymbolizer()

        for thread in self._threads:
            proto_detokenizer.detokenize_fields(self._tokenizer_db, thread)

    def active_thread(self) -> Optional[thread_pb2.Thread]:
        """The thread that requested the snapshot capture."""
        running_thread = None
        for thread in self._threads:
            if thread.active:
                return thread
            if thread.state == thread_pb2.ThreadState.Enum.RUNNING:
                running_thread = thread

        return running_thread

    def __str__(self) -> str:
        """outputs a pw.snapshot.Metadata proto as a multi-line string."""
        output: List[str] = []
        if not self._threads:
            return ''

        output.append('Thread State')
        plural = '' if len(self._threads) == 1 else 's'
        thread_state_overview = f'  {len(self._threads)} thread{plural} running'
        requesting_thread = self.active_thread()
        if not requesting_thread:
            thread_state_overview += '.'
            output.append(thread_state_overview)
        else:
            thread_state_overview += ', '
            underline = (' ' * len(thread_state_overview) +
                         '~' * len(requesting_thread.name.decode()))
            thread_state_overview += (f'{requesting_thread.name.decode()}'
                                      ' active at the time of capture.')
            output.append(thread_state_overview)
            output.append(underline)

        output.append('')

        # Put the active thread at the front.
        requesting_thread = self.active_thread()
        if requesting_thread is not None:
            self._threads.remove(requesting_thread)
            self._threads.insert(0, requesting_thread)

        for thread in self._threads:
            thread_name = thread.name.decode()
            if not thread_name:
                thread_name = '[unnamed thread]'
            thread_headline = ('Thread '
                               f'({THREAD_STATE_TO_STRING[thread.state]}): '
                               f'{thread_name}')
            if self.active_thread() == thread:
                thread_headline += ' <-- [ACTIVE]'
            output.append(thread_headline)
            output.append(str(ThreadInfo(thread)))
            if thread.raw_backtrace:
                output.append(
                    self._symbolizer.dump_stack_trace(thread.raw_backtrace))
            # Blank line between threads for nicer formatting.
            output.append('')

        return '\n'.join(output)
