pw_kvs: Add initial parts self-repair from errors

Add self-repair/recovery when the KVS detects errors. This repair is two parts:
- Ensure that each stored key has the proper number of redundant entries.
- Garbage collect (erase and re-init) any sectors with corrupt bytes detected.

This change has error detection in KVS init() and uses a stubbed out Recover().
The actual repair process and error detection during normal operation (Get, Put,
GC) is WIP and not included in this change.

Change-Id: Id3398dd91a5665be8eb1a3aba57961cbbaf9411d
diff --git a/pw_kvs/key_value_store.cc b/pw_kvs/key_value_store.cc
index 091c244..50af414 100644
--- a/pw_kvs/key_value_store.cc
+++ b/pw_kvs/key_value_store.cc
@@ -56,14 +56,15 @@
       entry_cache_(key_descriptor_list, addresses, redundancy),
       sectors_(sector_descriptor_list),
       temp_sectors_to_skip_(temp_sectors_to_skip),
-      options_(options) {
-  initialized_ = false;
-  last_new_sector_ = nullptr;
-  last_transaction_id_ = 0;
-}
+      options_(options),
+      initialized_(false),
+      error_detected_(false),
+      last_new_sector_(nullptr),
+      last_transaction_id_(0) {}
 
 Status KeyValueStore::Init() {
   initialized_ = false;
+  error_detected_ = false;
   last_new_sector_ = nullptr;
   last_transaction_id_ = 0;
   entry_cache_.Reset();
@@ -79,6 +80,15 @@
 
   const size_t sector_size_bytes = partition_.sector_size_bytes();
 
+  // TODO: investigate doing this as a static assert/compile-time check.
+  if (sector_size_bytes > SectorDescriptor::max_sector_size()) {
+    ERR("KVS init failed: sector_size_bytes (=%zu) is greater than maximum "
+        "allowed sector size (=%zu)",
+        sector_size_bytes,
+        SectorDescriptor::max_sector_size());
+    return Status::FAILED_PRECONDITION;
+  }
+
   DBG("First pass: Read all entries from all sectors");
   Address sector_address = 0;
 
@@ -118,6 +128,7 @@
             SectorIndex(&sector),
             size_t(entry_address));
 
+        error_detected_ = true;
         corrupt_entries++;
 
         status = ScanForEntry(sector,
@@ -156,7 +167,8 @@
       // being written to it by indicating that it has no space. This should
       // also make it a decent GC candidate. Valid keys in the sector are still
       // readable as normal.
-      sector.set_writable_bytes(0);
+      sector.mark_corrupt();
+      error_detected_ = true;
 
       WRN("Sector %u contains %zuB of corrupt data",
           SectorIndex(&sector),
@@ -176,6 +188,9 @@
   // For every valid entry, count the valid bytes in that sector. Track which
   // entry has the newest transaction ID for initializing last_new_sector_.
   for (const EntryMetadata& metadata : entry_cache_) {
+    if (metadata.addresses().size() < redundancy()) {
+      error_detected_ = true;
+    }
     for (Address address : metadata.addresses()) {
       Entry entry;
       TRY(Entry::Read(partition_, address, formats_, &entry));
@@ -189,6 +204,15 @@
 
   last_new_sector_ = SectorFromAddress(newest_key);
 
+  if (error_detected_) {
+    Status recovery_status = Repair();
+    if (recovery_status.ok()) {
+      INF("KVS init: Corruption detected and fully repaired");
+    } else {
+      ERR("KVS init: Corruption detected and unable repair");
+    }
+  }
+
   if (!empty_sector_found) {
     // TODO: Record/report the error condition and recovery result.
     Status gc_result = GarbageCollectPartial();
diff --git a/pw_kvs/public/pw_kvs/internal/sector_descriptor.h b/pw_kvs/public/pw_kvs/internal/sector_descriptor.h
index 340ede3..58576c7 100644
--- a/pw_kvs/public/pw_kvs/internal/sector_descriptor.h
+++ b/pw_kvs/public/pw_kvs/internal/sector_descriptor.h
@@ -13,6 +13,7 @@
 // the License.
 #pragma once
 
+#include <climits>
 #include <cstddef>
 #include <cstdint>
 
@@ -27,13 +28,19 @@
   SectorDescriptor(const SectorDescriptor&) = default;
   SectorDescriptor& operator=(const SectorDescriptor&) = default;
 
-  // The number of bytes available to be written in this sector.
-  size_t writable_bytes() const { return tail_free_bytes_; }
+  // The number of bytes available to be written in this sector. It the sector
+  // is marked as corrupt, no bytes are available.
+  size_t writable_bytes() const {
+    return (tail_free_bytes_ == kCorruptSector) ? 0 : tail_free_bytes_;
+  }
 
   void set_writable_bytes(uint16_t writable_bytes) {
     tail_free_bytes_ = writable_bytes;
   }
 
+  void mark_corrupt() { tail_free_bytes_ = kCorruptSector; }
+  bool corrupt() const { return tail_free_bytes_ == kCorruptSector; }
+
   // The number of bytes of valid data in this sector.
   size_t valid_bytes() const { return valid_bytes_; }
 
@@ -61,20 +68,24 @@
   }
 
   bool HasSpace(size_t required_space) const {
-    return tail_free_bytes_ >= required_space;
+    return writable_bytes() >= required_space;
   }
 
   bool Empty(size_t sector_size_bytes) const {
-    return tail_free_bytes_ == sector_size_bytes;
+    return writable_bytes() == sector_size_bytes;
   }
 
   // Returns the number of bytes that would be recovered if this sector is
   // garbage collected.
   size_t RecoverableBytes(size_t sector_size_bytes) const {
-    return sector_size_bytes - valid_bytes_ - tail_free_bytes_;
+    return sector_size_bytes - valid_bytes_ - writable_bytes();
   }
 
+  static constexpr size_t max_sector_size() { return kMaxSectorSize; }
+
  private:
+  static constexpr uint16_t kCorruptSector = UINT16_MAX;
+  static constexpr size_t kMaxSectorSize = UINT16_MAX - 1;
   uint16_t tail_free_bytes_;  // writable bytes at the end of the sector
   uint16_t valid_bytes_;      // sum of sizes of valid entries
 };
diff --git a/pw_kvs/public/pw_kvs/key_value_store.h b/pw_kvs/public/pw_kvs/key_value_store.h
index 1e23c45..be90f9d 100644
--- a/pw_kvs/public/pw_kvs/key_value_store.h
+++ b/pw_kvs/public/pw_kvs/key_value_store.h
@@ -392,6 +392,8 @@
   Status GarbageCollectSector(SectorDescriptor* sector_to_gc,
                               span<const Address> addresses_to_skip);
 
+  Status Repair() { return Status::UNIMPLEMENTED; }
+
   bool AddressInSector(const SectorDescriptor& sector, Address address) const {
     const Address sector_base = SectorBaseAddress(&sector);
     const Address sector_end = sector_base + partition_.sector_size_bytes();
@@ -449,6 +451,8 @@
 
   bool initialized_;
 
+  bool error_detected_;
+
   // The last sector that was selected as the "new empty sector" to write to.
   // This last new sector is used as the starting point for the next "find a new
   // empty sector to write to" operation. By using the last new sector as the