blob: de8486a5968a1e812977f814831309471c93ce17 [file] [log] [blame]
# Copyright 2021 The Pigweed Authors
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy of
# the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations under
# the License.
"""Pigweed Console progress bar task state."""
from contextvars import ContextVar
import copy
from dataclasses import dataclass, field
import signal
from typing import Dict, Optional, Union
from prompt_toolkit.application import get_app_or_none
from prompt_toolkit.shortcuts import ProgressBar
from prompt_toolkit.shortcuts.progress_bar import formatters
from pw_console.progress_bar.progress_bar_impl import (
IterationsPerSecondIfNotHidden,
ProgressBarImpl,
TextIfNotHidden,
TimeLeftIfNotHidden,
)
from pw_console.progress_bar.progress_bar_task_counter import (
ProgressBarTaskCounter)
from pw_console.style import generate_styles
CUSTOM_FORMATTERS = [
formatters.Label(suffix=': '),
formatters.Rainbow(
formatters.Bar(start='|Pigw',
end='|',
sym_a='e',
sym_b='d!',
sym_c=' ')),
formatters.Text(' '),
formatters.Progress(),
formatters.Text(' ['),
formatters.Percentage(),
formatters.Text('] in '),
formatters.TimeElapsed(),
TextIfNotHidden(' ('),
IterationsPerSecondIfNotHidden(),
TextIfNotHidden('/s, eta: '),
TimeLeftIfNotHidden(),
TextIfNotHidden(')'),
]
def prompt_toolkit_app_running() -> bool:
existing_app = get_app_or_none()
if existing_app:
return True
return False
@dataclass
class ProgressBarState:
"""Pigweed Console wide state for all repl progress bars.
An instance of this class is intended to be a global variable."""
tasks: Dict[str, ProgressBarTaskCounter] = field(default_factory=dict)
instance: Optional[Union[ProgressBar, ProgressBarImpl]] = None
def _install_sigint_handler(self) -> None:
"""Add ctrl-c handling if not running inside pw_console"""
def handle_sigint(_signum, _frame):
# Shut down the ProgressBar prompt_toolkit application
prog_bar = self.instance
if prog_bar is not None and hasattr(prog_bar, '__exit__'):
prog_bar.__exit__()
raise KeyboardInterrupt
signal.signal(signal.SIGINT, handle_sigint)
def startup_progress_bar_impl(self):
prog_bar = self.instance
if not prog_bar:
if prompt_toolkit_app_running():
prog_bar = ProgressBarImpl(style=get_app_or_none().style,
formatters=CUSTOM_FORMATTERS)
else:
self._install_sigint_handler()
prog_bar = ProgressBar(style=generate_styles(),
formatters=CUSTOM_FORMATTERS)
# Start the ProgressBar prompt_toolkit application in a separate
# thread.
prog_bar.__enter__()
self.instance = prog_bar
return self.instance
def cleanup_finished_tasks(self) -> None:
for task_name in copy.copy(list(self.tasks.keys())):
task = self.tasks[task_name]
if task.completed or task.canceled:
ptc = task.prompt_toolkit_counter
self.tasks.pop(task_name, None)
if (self.instance and self.instance.counters
and ptc in self.instance.counters):
self.instance.counters.remove(ptc)
@property
def all_tasks_complete(self) -> bool:
tasks_complete = [
task.completed or task.canceled
for _task_name, task in self.tasks.items()
]
self.cleanup_finished_tasks()
return all(tasks_complete)
def cancel_all_tasks(self):
self.tasks = {}
if self.instance is not None:
self.instance.counters = []
def get_container(self):
prog_bar = self.instance
if prog_bar is not None and hasattr(prog_bar, '__pt_container__'):
return prog_bar.__pt_container__()
return None
TASKS_CONTEXTVAR = (ContextVar('pw_console_progress_bar_tasks',
default=ProgressBarState()))