| #!/usr/bin/env python3 |
| |
| import struct |
| import binascii |
| import sys |
| import itertools as it |
| |
| TAG_TYPES = { |
| 'splice': (0x700, 0x400), |
| 'create': (0x7ff, 0x401), |
| 'delete': (0x7ff, 0x4ff), |
| 'name': (0x700, 0x000), |
| 'reg': (0x7ff, 0x001), |
| 'dir': (0x7ff, 0x002), |
| 'superblock': (0x7ff, 0x0ff), |
| 'struct': (0x700, 0x200), |
| 'dirstruct': (0x7ff, 0x200), |
| 'ctzstruct': (0x7ff, 0x202), |
| 'inlinestruct': (0x7ff, 0x201), |
| 'userattr': (0x700, 0x300), |
| 'tail': (0x700, 0x600), |
| 'softtail': (0x7ff, 0x600), |
| 'hardtail': (0x7ff, 0x601), |
| 'gstate': (0x700, 0x700), |
| 'movestate': (0x7ff, 0x7ff), |
| 'crc': (0x700, 0x500), |
| } |
| |
| class Tag: |
| def __init__(self, *args): |
| if len(args) == 1: |
| self.tag = args[0] |
| elif len(args) == 3: |
| if isinstance(args[0], str): |
| type = TAG_TYPES[args[0]][1] |
| else: |
| type = args[0] |
| |
| if isinstance(args[1], str): |
| id = int(args[1], 0) if args[1] not in 'x.' else 0x3ff |
| else: |
| id = args[1] |
| |
| if isinstance(args[2], str): |
| size = int(args[2], str) if args[2] not in 'x.' else 0x3ff |
| else: |
| size = args[2] |
| |
| self.tag = (type << 20) | (id << 10) | size |
| else: |
| assert False |
| |
| @property |
| def isvalid(self): |
| return not bool(self.tag & 0x80000000) |
| |
| @property |
| def isattr(self): |
| return not bool(self.tag & 0x40000000) |
| |
| @property |
| def iscompactable(self): |
| return bool(self.tag & 0x20000000) |
| |
| @property |
| def isunique(self): |
| return not bool(self.tag & 0x10000000) |
| |
| @property |
| def type(self): |
| return (self.tag & 0x7ff00000) >> 20 |
| |
| @property |
| def type1(self): |
| return (self.tag & 0x70000000) >> 20 |
| |
| @property |
| def type3(self): |
| return (self.tag & 0x7ff00000) >> 20 |
| |
| @property |
| def id(self): |
| return (self.tag & 0x000ffc00) >> 10 |
| |
| @property |
| def size(self): |
| return (self.tag & 0x000003ff) >> 0 |
| |
| @property |
| def dsize(self): |
| return 4 + (self.size if self.size != 0x3ff else 0) |
| |
| @property |
| def chunk(self): |
| return self.type & 0xff |
| |
| @property |
| def schunk(self): |
| return struct.unpack('b', struct.pack('B', self.chunk))[0] |
| |
| def is_(self, type): |
| return (self.type & TAG_TYPES[type][0]) == TAG_TYPES[type][1] |
| |
| def mkmask(self): |
| return Tag( |
| 0x700 if self.isunique else 0x7ff, |
| 0x3ff if self.isattr else 0, |
| 0) |
| |
| def chid(self, nid): |
| ntag = Tag(self.type, nid, self.size) |
| if hasattr(self, 'off'): ntag.off = self.off |
| if hasattr(self, 'data'): ntag.data = self.data |
| if hasattr(self, 'crc'): ntag.crc = self.crc |
| return ntag |
| |
| def typerepr(self): |
| if self.is_('crc') and getattr(self, 'crc', 0xffffffff) != 0xffffffff: |
| return 'crc (bad)' |
| |
| reverse_types = {v: k for k, v in TAG_TYPES.items()} |
| for prefix in range(12): |
| mask = 0x7ff & ~((1 << prefix)-1) |
| if (mask, self.type & mask) in reverse_types: |
| type = reverse_types[mask, self.type & mask] |
| if prefix > 0: |
| return '%s %#0*x' % ( |
| type, prefix//4, self.type & ((1 << prefix)-1)) |
| else: |
| return type |
| else: |
| return '%02x' % self.type |
| |
| def idrepr(self): |
| return repr(self.id) if self.id != 0x3ff else '.' |
| |
| def sizerepr(self): |
| return repr(self.size) if self.size != 0x3ff else 'x' |
| |
| def __repr__(self): |
| return 'Tag(%r, %d, %d)' % (self.typerepr(), self.id, self.size) |
| |
| def __lt__(self, other): |
| return (self.id, self.type) < (other.id, other.type) |
| |
| def __bool__(self): |
| return self.isvalid |
| |
| def __int__(self): |
| return self.tag |
| |
| def __index__(self): |
| return self.tag |
| |
| class MetadataPair: |
| def __init__(self, blocks): |
| if len(blocks) > 1: |
| self.pair = [MetadataPair([block]) for block in blocks] |
| self.pair = sorted(self.pair, reverse=True) |
| |
| self.data = self.pair[0].data |
| self.rev = self.pair[0].rev |
| self.tags = self.pair[0].tags |
| self.ids = self.pair[0].ids |
| self.log = self.pair[0].log |
| self.all_ = self.pair[0].all_ |
| return |
| |
| self.pair = [self] |
| self.data = blocks[0] |
| block = self.data |
| |
| self.rev, = struct.unpack('<I', block[0:4]) |
| crc = binascii.crc32(block[0:4]) |
| |
| # parse tags |
| corrupt = False |
| tag = Tag(0xffffffff) |
| off = 4 |
| self.log = [] |
| self.all_ = [] |
| while len(block) - off >= 4: |
| ntag, = struct.unpack('>I', block[off:off+4]) |
| |
| tag = Tag(int(tag) ^ ntag) |
| tag.off = off + 4 |
| tag.data = block[off+4:off+tag.dsize] |
| if tag.is_('crc'): |
| crc = binascii.crc32(block[off:off+4+4], crc) |
| else: |
| crc = binascii.crc32(block[off:off+tag.dsize], crc) |
| tag.crc = crc |
| off += tag.dsize |
| |
| self.all_.append(tag) |
| |
| if tag.is_('crc'): |
| # is valid commit? |
| if crc != 0xffffffff: |
| corrupt = True |
| if not corrupt: |
| self.log = self.all_.copy() |
| |
| # reset tag parsing |
| crc = 0 |
| tag = Tag(int(tag) ^ ((tag.type & 1) << 31)) |
| |
| # find active ids |
| self.ids = list(it.takewhile( |
| lambda id: Tag('name', id, 0) in self, |
| it.count())) |
| |
| # find most recent tags |
| self.tags = [] |
| for tag in self.log: |
| if tag.is_('crc') or tag.is_('splice'): |
| continue |
| elif tag.id == 0x3ff: |
| if tag in self and self[tag] is tag: |
| self.tags.append(tag) |
| else: |
| # id could have change, I know this is messy and slow |
| # but it works |
| for id in self.ids: |
| ntag = tag.chid(id) |
| if ntag in self and self[ntag] is tag: |
| self.tags.append(ntag) |
| |
| self.tags = sorted(self.tags) |
| |
| def __bool__(self): |
| return bool(self.log) |
| |
| def __lt__(self, other): |
| # corrupt blocks don't count |
| if not self or not other: |
| return bool(other) |
| |
| # use sequence arithmetic to avoid overflow |
| return not ((other.rev - self.rev) & 0x80000000) |
| |
| def __contains__(self, args): |
| try: |
| self[args] |
| return True |
| except KeyError: |
| return False |
| |
| def __getitem__(self, args): |
| if isinstance(args, tuple): |
| gmask, gtag = args |
| else: |
| gmask, gtag = args.mkmask(), args |
| |
| gdiff = 0 |
| for tag in reversed(self.log): |
| if (gmask.id != 0 and tag.is_('splice') and |
| tag.id <= gtag.id - gdiff): |
| if tag.is_('create') and tag.id == gtag.id - gdiff: |
| # creation point |
| break |
| |
| gdiff += tag.schunk |
| |
| if ((int(gmask) & int(tag)) == |
| (int(gmask) & int(gtag.chid(gtag.id - gdiff)))): |
| if tag.size == 0x3ff: |
| # deleted |
| break |
| |
| return tag |
| |
| raise KeyError(gmask, gtag) |
| |
| def _dump_tags(self, tags, f=sys.stdout, truncate=True): |
| f.write("%-8s %-8s %-13s %4s %4s" % ( |
| 'off', 'tag', 'type', 'id', 'len')) |
| if truncate: |
| f.write(' data (truncated)') |
| f.write('\n') |
| |
| for tag in tags: |
| f.write("%08x: %08x %-13s %4s %4s" % ( |
| tag.off, tag, |
| tag.typerepr(), tag.idrepr(), tag.sizerepr())) |
| if truncate: |
| f.write(" %-23s %-8s\n" % ( |
| ' '.join('%02x' % c for c in tag.data[:8]), |
| ''.join(c if c >= ' ' and c <= '~' else '.' |
| for c in map(chr, tag.data[:8])))) |
| else: |
| f.write("\n") |
| for i in range(0, len(tag.data), 16): |
| f.write(" %08x: %-47s %-16s\n" % ( |
| tag.off+i, |
| ' '.join('%02x' % c for c in tag.data[i:i+16]), |
| ''.join(c if c >= ' ' and c <= '~' else '.' |
| for c in map(chr, tag.data[i:i+16])))) |
| |
| def dump_tags(self, f=sys.stdout, truncate=True): |
| self._dump_tags(self.tags, f=f, truncate=truncate) |
| |
| def dump_log(self, f=sys.stdout, truncate=True): |
| self._dump_tags(self.log, f=f, truncate=truncate) |
| |
| def dump_all(self, f=sys.stdout, truncate=True): |
| self._dump_tags(self.all_, f=f, truncate=truncate) |
| |
| def main(args): |
| blocks = [] |
| with open(args.disk, 'rb') as f: |
| for block in [args.block1, args.block2]: |
| if block is None: |
| continue |
| f.seek(block * args.block_size) |
| blocks.append(f.read(args.block_size) |
| .ljust(args.block_size, b'\xff')) |
| |
| # find most recent pair |
| mdir = MetadataPair(blocks) |
| |
| try: |
| mdir.tail = mdir[Tag('tail', 0, 0)] |
| if mdir.tail.size != 8 or mdir.tail.data == 8*b'\xff': |
| mdir.tail = None |
| except KeyError: |
| mdir.tail = None |
| |
| print("mdir {%s} rev %d%s%s%s" % ( |
| ', '.join('%#x' % b |
| for b in [args.block1, args.block2] |
| if b is not None), |
| mdir.rev, |
| ' (was %s)' % ', '.join('%d' % m.rev for m in mdir.pair[1:]) |
| if len(mdir.pair) > 1 else '', |
| ' (corrupted!)' if not mdir else '', |
| ' -> {%#x, %#x}' % struct.unpack('<II', mdir.tail.data) |
| if mdir.tail else '')) |
| if args.all: |
| mdir.dump_all(truncate=not args.no_truncate) |
| elif args.log: |
| mdir.dump_log(truncate=not args.no_truncate) |
| else: |
| mdir.dump_tags(truncate=not args.no_truncate) |
| |
| return 0 if mdir else 1 |
| |
| if __name__ == "__main__": |
| import argparse |
| import sys |
| parser = argparse.ArgumentParser( |
| description="Dump useful info about metadata pairs in littlefs.") |
| parser.add_argument('disk', |
| help="File representing the block device.") |
| parser.add_argument('block_size', type=lambda x: int(x, 0), |
| help="Size of a block in bytes.") |
| parser.add_argument('block1', type=lambda x: int(x, 0), |
| help="First block address for finding the metadata pair.") |
| parser.add_argument('block2', nargs='?', type=lambda x: int(x, 0), |
| help="Second block address for finding the metadata pair.") |
| parser.add_argument('-l', '--log', action='store_true', |
| help="Show tags in log.") |
| parser.add_argument('-a', '--all', action='store_true', |
| help="Show all tags in log, included tags in corrupted commits.") |
| parser.add_argument('-T', '--no-truncate', action='store_true', |
| help="Don't truncate large amounts of data.") |
| sys.exit(main(parser.parse_args())) |