blob: 251b1be5642c9a429dd2a073532d9c1935b52272 [file] [log] [blame]
#
# Copyright (c) 2021 Project CHIP Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""Data frame selection utilities."""
import numpy as np # type: ignore
import memdf.name
import memdf.util.pretty
import memdf.util.config
from memdf import Config, ConfigDescription, DF
from typing import Mapping, Optional
def split_size(config: Config, key: str) -> None:
"""Split a name:size configuration value.
When a program supports a size threshold for selection or summary,
this can be specificed for a particular item with a suffix on the
configuration, e.g. `--section=.text:16K`.
Given a configuration key `col.select` referring to such a list of
arguments, this function strips any sizes from those arguments
and stores them as a name:size dictionary in `col.limit`.
"""
src = key.split('.')
dst = src[:-1] + ['limit']
splits = [s.split(':') for s in config.getl(src, [])]
config.putl(src, [x[0] for x in splits])
config.putl(dst, {
x[0]: memdf.util.config.parse_size(x[1])
for x in splits if len(x) > 1
})
def get_limit(config: Config, column: str, name: str) -> int:
return config.getl([column, 'limit', name], config.get('report.limit', 0))
def postprocess_selections(config: Config, key: str, info: Mapping) -> None:
"""Resolve select/ignore command options."""
split_size(config, key)
choice, select = key.split('.')
assert select == 'select'
selections = config.get(key)
if not config.getl([choice, 'ignore-all'], False):
if defaults := config.getl([choice, 'default']):
for i in config.getl([choice, 'ignore']):
if i in defaults:
defaults.remove(i)
selections += defaults
config.put(key, frozenset(selections))
def select_and_ignore_config_desc(key: str) -> ConfigDescription:
return {
Config.group_map(key): {
'group': 'select'
},
f'{key}.select': {
'help':
f'{key.capitalize()}(s) to process; otherwise all not ignored',
'metavar': 'NAME',
'default': [],
'argparse': {
'alias': [f'--{key}'],
},
'postprocess': postprocess_selections
},
f'{key}.select-all': {
'help': f'Select all {key}s',
'default': False,
},
key + '.ignore': {
'help': f'{key.capitalize()}(s) to ignore',
'metavar': 'NAME',
'default': [],
},
f'{key}.ignore-all': {
'help': f'Ignore all {key}s unless explicitly selected',
'default': False,
},
}
SECTION_CONFIG = select_and_ignore_config_desc('section')
SYMBOL_CONFIG = select_and_ignore_config_desc('symbol')
REGION_CONFIG = select_and_ignore_config_desc('region')
CONFIG: ConfigDescription = {
Config.group_def('select'): {
'title': 'selection options',
},
**SECTION_CONFIG,
**SYMBOL_CONFIG,
**REGION_CONFIG,
}
COLLECTED_CHOICES = ['symbol', 'section']
SYNTHETIC_CHOICES = ['region']
SELECTION_CHOICES = COLLECTED_CHOICES + SYNTHETIC_CHOICES
def is_selected(config: Config, column, name) -> bool:
"""Test `name` against the configured selection criteria for `column`."""
if config.getl([column, 'select-all']):
return True
if name in config.getl([column, 'select'], []):
return True
return False
def synthesize_region(config: Config, df: DF, column: str) -> DF:
"""Add a 'region' column derived from the 'section' column."""
cmap = config.transpose_dictlist(config.get('region.sections', {}))
memdf.util.pretty.debug(cmap)
df[column] = df['section'].map(lambda x: cmap.get(x, memdf.name.UNKNOWN))
return df
def groupby_region(df: DF):
return df[(df['size'] > 0) | (df['region'] != memdf.name.UNKNOWN)]
SYNTHESIZE = {
'region': (synthesize_region, groupby_region),
}
def synthesize_column(config: Config, df: DF, column: str) -> DF:
if column not in df.columns:
SYNTHESIZE[column][0](config, df, column)
return df
def select_configured_column(config: Config, df: DF, column: str) -> DF:
"""Apply configured selection options to a column"""
if column in df and not config.getl([column, 'select-all']):
selections = config.getl([column, 'select'], [])
if selections:
df = df.loc[df[column].isin(selections)]
return df
def select_configured(config: Config, df: DF, columns=SELECTION_CHOICES) -> DF:
for column in columns:
df = select_configured_column(config, df, column)
return df
def groupby(config: Config, df: DF, by: Optional[str] = None):
if not by:
by = config['report.by']
df = df[[by, 'size']].groupby(by).aggregate(np.sum).reset_index()
if by in SYNTHESIZE:
df = SYNTHESIZE[by][1](df)
return df