blob: 18cd0048669b3764627399e6218db8386a51005c [file] [log] [blame]
#
# Copyright (c) 2021 Project CHIP Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""Collect information from various sources into Memory Map DataFrames."""
import bisect
from typing import Callable, Dict, List, Mapping, Optional, Sequence, Tuple
import memdf.collector.bloaty
import memdf.collector.csv
import memdf.collector.elftools
import memdf.collector.readelf
import memdf.collector.su
import memdf.name
import memdf.select
import memdf.util.config
import pandas as pd # type: ignore
from elftools.elf.constants import SH_FLAGS # type: ignore
from memdf import DF, Config, ConfigDescription, DFs, ExtentDF, SectionDF, SymbolDF
from memdf.collector.util import simplify_source
PREFIX_CONFIG: ConfigDescription = {
'collect.prefix': {
'help': 'Strip PATH from the beginning of source file names',
'metavar': 'PATH',
'default': [],
'argparse': {
'alias': ['--prefix', '--strip-prefix'],
'action': 'append',
}
},
}
CONFIG: ConfigDescription = {
Config.group_def('input'): {
'title': 'input options',
},
Config.group_def('tool'): {
'title': 'external tool options',
},
Config.group_map('collect'): {
'group': 'input'
},
**memdf.collector.bloaty.CONFIG,
**memdf.collector.csv.CONFIG,
**memdf.collector.elftools.CONFIG,
**memdf.collector.readelf.CONFIG,
'collect.method': {
'help':
'Method of input processing: one of'
' elftools, readelf, bloaty, csv, tsv, su.',
'metavar': 'METHOD',
'choices': ['elftools', 'readelf', 'bloaty', 'csv', 'tsv', 'su'],
'default': 'elftools',
'argparse': {
'alias': ['-f'],
},
},
**PREFIX_CONFIG,
}
ARM_SPECIAL_SYMBOLS = frozenset(["$a", "$t", "$t.x", "$d", "$d.realdata"])
def postprocess_symbols(config: Config, symbols: SymbolDF) -> SymbolDF:
"""Postprocess a symbol table after collecting from one source.
If the symbol table contains FILE symbols, they will be removed and
replaced by a 'file' column on other symbols.
If the symbol table contains ARM mode symbols, they will be removed
and replaced by an 'arm' column on other symbols.
"""
files = []
arms = []
arm_symbols = {}
current_file = ''
current_arm = ''
has_file = False
if config['collect.prefix-file']:
prefixes = config.get_re('collect.prefix')
else:
prefixes = None
if 'type' in symbols.columns:
for symbol in symbols.itertuples():
if symbol.type == 'FILE':
has_file = True
current_file = symbol.symbol
if prefixes:
current_file = simplify_source(current_file, prefixes)
elif symbol.type == 'NOTYPE':
if symbol.symbol.startswith('$'):
if current_arm or symbol.symbol in ARM_SPECIAL_SYMBOLS:
current_arm = symbol.symbol
arm_symbols[current_arm] = True
files.append(current_file)
arms.append(current_arm)
if has_file:
symbols['file'] = files
if current_arm:
symbols['arm'] = arms
if has_file:
symbols = symbols[symbols['type'] != 'FILE']
if current_arm:
syms = arm_symbols.keys()
symbols = symbols[~symbols.symbol.isin(syms)]
return symbols
def postprocess_file(config: Config, dfs: DFs) -> None:
"""Postprocess tables after collecting from one source."""
if SymbolDF.name in dfs:
dfs[SymbolDF.name] = postprocess_symbols(config, dfs[SymbolDF.name])
def fill_holes(config: Config, symbols: SymbolDF, sections: SectionDF) -> DFs:
"""Account for space not used by any symbol, or by multiple symbols."""
# These symbols mark the start or end of unused space.
start_unused = frozenset(config.get('symbol.free.start', []))
end_unused = frozenset(config.get('symbol.free.end', []))
extent_columns = ['address', 'size', 'section', 'file']
need_cu = 'cu' in symbols.columns
if need_cu:
extent_columns.append('cu')
need_input = 'input' in symbols.columns
if need_input:
extent_columns.append('input')
columns = ['symbol', *extent_columns, 'type', 'bind']
def filler(name, address, size, previous, current) -> List:
row = [
name, # symbol
address, # address
size, # size
(previous.section if previous else
current.section if current else memdf.name.UNDEF), # section
(previous.file
if previous else current.file if current else ''), # file
]
if need_cu:
row.append(
previous.cu if previous else current.cu if current else '')
if need_input:
row.append(previous.input if previous else current.
input if current else '')
row.append('NOTYPE') # type
row.append('LOCAL') # bind
return row
def fill_gap(previous, current, from_address,
to_address) -> Tuple[str, List]:
"""Add a row for a unaccounted gap or unused space."""
size = to_address - from_address
if (previous is None or previous.symbol in start_unused
or current.symbol in end_unused):
use = 'unused'
name = memdf.name.unused(from_address, size)
else:
use = 'gap'
name = memdf.name.gap(from_address, size)
return (use, filler(name, from_address, size, previous, current))
def fill_overlap(previous, current, from_address,
to_address) -> Tuple[str, List]:
"""Add a row for overlap."""
size = to_address - from_address
return ('overlap',
filler(memdf.name.overlap(from_address, -size), from_address,
size, previous, current))
# Find the address range for sections that are configured or allocated.
config_sections = set()
for _, s in config.get('region.sections', {}).items():
config_sections |= set(s)
section_to_range = {}
start_to_section = {}
section_starts = [0]
for s in sections.itertuples():
if ((s.section in config_sections) or (s.flags & SH_FLAGS.SHF_ALLOC)):
section_to_range[s.section] = range(s.address, s.address + s.size)
start_to_section[s.address] = s.section
section_starts.append(s.address)
section_starts.sort()
new_symbols: Dict[str, List[list]] = {
'gap': [],
'unused': [],
'overlap': []
}
section_range = None
previous_symbol = None
current_address = 0
iterable_symbols = symbols.loc[(symbols.type != 'SECTION')
& (symbols.type != 'FILE')
& symbols.section.isin(section_to_range)]
iterable_symbols = iterable_symbols.sort_values(by='address')
for symbol in iterable_symbols.itertuples():
if not previous_symbol or symbol.section != previous_symbol.section:
# We sometimes see symbols that have the value of their section end
# address (so they are not actually within the section) and have
# the same address as a symbol in the next section.
symbol_address_section = start_to_section.get(section_starts[
bisect.bisect_right(section_starts, symbol.address) - 1])
if symbol_address_section != symbol.section:
continue
# Starting or switching sections.
if previous_symbol and section_range:
# previous_symbol is the last in its section.
if current_address < section_range[-1] + 1:
use, row = fill_gap(previous_symbol, previous_symbol,
current_address, section_range[-1] + 1)
new_symbols[use].append(row)
# Start of section.
previous_symbol = None
section_range = section_to_range.get(symbol.section)
if section_range:
current_address = section_range[0]
if section_range:
if current_address < symbol.address:
use, row = fill_gap(previous_symbol, symbol, current_address,
symbol.address)
new_symbols[use].append(row)
elif current_address > symbol.address:
use, row = fill_overlap(previous_symbol, symbol,
current_address, symbol.address)
new_symbols[use].append(row)
current_address = symbol.address + symbol.size
previous_symbol = symbol
dfs = {k: SymbolDF(new_symbols[k], columns=columns) for k in new_symbols}
symbols = pd.concat([symbols, *dfs.values()]).fillna('')
symbols.sort_values(by='address', inplace=True)
for k in dfs:
dfs[k] = ExtentDF(dfs[k][extent_columns])
dfs[k].attrs['name'] = k
dfs[SymbolDF.name] = SymbolDF(symbols)
return dfs
def postprocess_collected(config: Config, dfs: DFs) -> None:
"""Postprocess tables after reading all sources."""
# Prune tables according to configuration options. This happens before
# fill_holes() so that space of any pruned symbols will be accounted for,
# and to avoid unnecessary work for pruned sections.
for c in [SymbolDF, SectionDF]:
if c.name in dfs:
dfs[c.name] = memdf.select.select_configured(
config, dfs[c.name], memdf.select.COLLECTED_CHOICES)
# Account for space not used by any symbol, or by multiple symbols.
if (SymbolDF.name in dfs and SectionDF.name in dfs
and config.get('args.fill_holes', True)):
dfs.update(fill_holes(config, dfs[SymbolDF.name], dfs[SectionDF.name]))
# Create synthetic columns (e.g. 'region') and prune tables
# according to their configuration. This happens after fill_holes()
# so that synthetic column values will be created for the gap symbols.
for c in [SymbolDF, SectionDF]:
if c.name in dfs:
for column in memdf.select.SYNTHETIC_CHOICES:
dfs[c.name] = memdf.select.synthesize_column(
config, dfs[c.name], column)
dfs[c.name] = memdf.select.select_configured_column(
config, dfs[c.name], column)
for df in dfs.values():
if demangle := set((c for c in df.columns if c.endswith('symbol'))):
df.attrs['demangle'] = demangle
if hexify := set((c for c in df.columns if c.endswith('address'))):
df.attrs['hexify'] = hexify
FileReader = Callable[[Config, str, str], DFs]
FILE_READERS: Dict[str, FileReader] = {
'bloaty': memdf.collector.bloaty.read_file,
'elftools': memdf.collector.elftools.read_file,
'readelf': memdf.collector.readelf.read_file,
'csv': memdf.collector.csv.read_file,
'tsv': memdf.collector.csv.read_file,
'su': memdf.collector.su.read_dir,
}
def collect_files(config: Config,
files: Optional[List[str]] = None,
method: Optional[str] = None) -> DFs:
"""Read a filtered memory map from a set of files."""
filenames = files if files else config.get('args.inputs', [])
if method is None:
method = config.get('collect.method', 'csv')
frames: Dict[str, List[DF]] = {}
for filename in filenames:
dfs: DFs = FILE_READERS[method](config, filename, method)
postprocess_file(config, dfs)
for k, frame in dfs.items():
if k not in frames:
frames[k] = []
frames[k].append(frame)
dfs = {}
for k, v in frames.items():
dfs[k] = pd.concat(v, ignore_index=True)
postprocess_collected(config, dfs)
return dfs
def parse_args(config_desc: Mapping, argv: Sequence[str]) -> Config:
"""Common argument parsing for collection tools."""
config = Config().init({
**memdf.util.config.CONFIG,
**CONFIG,
**config_desc
})
config.argparse.add_argument('inputs', metavar='FILE', nargs='+')
return config.parse(argv)