pw_kvs: Add KVS error repair/recovery

- Add repair of detected errors in the KVS.
- Add additional error detection in KVS init
- KVS initialization has a new "needs maintenance" state. This is a
  read-only mode because not all of the required KVS system invariants
  have been met.

Change-Id: If13094855d8da03d59011891c8ed5af2ad38ad2d
diff --git a/pw_kvs/entry_cache.cc b/pw_kvs/entry_cache.cc
index 447417d..1f4dfa8 100644
--- a/pw_kvs/entry_cache.cc
+++ b/pw_kvs/entry_cache.cc
@@ -31,6 +31,23 @@
 
 }  // namespace
 
+void EntryMetadata::RemoveAddress(Address address_to_remove) {
+  // Find the index of the address to remove.
+  for (Address& address : addresses_) {
+    if (address == address_to_remove) {
+      // Move the address at the back of the list to the slot of the address
+      // being removed. Do this unconditionally, even if the address to remove
+      // is the last slot since the logic still works.
+      address = addresses_.back();
+
+      // Remove the back entry of the address list.
+      addresses_.back() = kNoAddress;
+      addresses_ = span(addresses_.begin(), addresses_.size() - 1);
+      break;
+    }
+  }
+}
+
 void EntryMetadata::Reset(const KeyDescriptor& descriptor, Address address) {
   *descriptor_ = descriptor;
 
diff --git a/pw_kvs/key_value_store.cc b/pw_kvs/key_value_store.cc
index d0ab512..9fa7199 100644
--- a/pw_kvs/key_value_store.cc
+++ b/pw_kvs/key_value_store.cc
@@ -48,13 +48,15 @@
       sectors_(sector_descriptor_list, *partition, temp_sectors_to_skip),
       entry_cache_(key_descriptor_list, addresses, redundancy),
       options_(options),
-      initialized_(false),
+      initialized_(InitializationState::kNotInitialized),
       error_detected_(false),
+      error_stats_({}),
       last_transaction_id_(0) {}
 
 Status KeyValueStore::Init() {
-  initialized_ = false;
+  initialized_ = InitializationState::kNotInitialized;
   error_detected_ = false;
+  error_stats_ = {};
   last_transaction_id_ = 0;
   sectors_.Reset();
   entry_cache_.Reset();
@@ -191,26 +193,34 @@
 
   sectors_.set_last_new_sector(newest_key);
 
-  if (error_detected_) {
-    Status recovery_status = Repair();
-    if (recovery_status.ok()) {
-      INF("KVS init: Corruption detected and fully repaired");
-    } else {
-      ERR("KVS init: Corruption detected and unable repair");
-    }
-  }
-
   if (!empty_sector_found) {
-    // TODO: Record/report the error condition and recovery result.
-    Status gc_result = GarbageCollectPartial();
-
-    if (!gc_result.ok()) {
-      ERR("KVS init failed: Unable to maintain required free sector");
-      return Status::INTERNAL;
-    }
+    error_detected_ = true;
   }
 
-  initialized_ = true;
+  if (!error_detected_) {
+    initialized_ = InitializationState::kReady;
+  } else {
+    if (options_.recovery != ErrorRecovery::kManual) {
+      WRN("KVS init: Corruption detected, beginning repair");
+      Status recovery_status = Repair();
+
+      if (recovery_status.ok()) {
+        WRN("KVS init: Corruption detected and fully repaired");
+        initialized_ = InitializationState::kReady;
+        total_corrupt_bytes = 0;
+        corrupt_entries = 0;
+      } else if (recovery_status == Status::RESOURCE_EXHAUSTED) {
+        WRN("KVS init: Unable to maintain required free sector");
+        initialized_ = InitializationState::kNeedsMaintenance;
+      } else {
+        WRN("KVS init: Corruption detected and unable repair");
+        initialized_ = InitializationState::kNeedsMaintenance;
+      }
+    } else {
+      WRN("KVS init: Corruption detected, no repair attempted due to options");
+      initialized_ = InitializationState::kNeedsMaintenance;
+    }
+  }
 
   INF("KeyValueStore init complete: active keys %zu, deleted keys %zu, sectors "
       "%zu, logical sector size %zu bytes",
@@ -219,6 +229,7 @@
       sectors_.size(),
       partition_.sector_size_bytes());
 
+  // Report any corruption was not repaired.
   if (total_corrupt_bytes > 0) {
     WRN("Found %zu corrupt bytes and %d corrupt entries during init process; "
         "some keys may be missing",
@@ -231,9 +242,12 @@
 }
 
 KeyValueStore::StorageStats KeyValueStore::GetStorageStats() const {
-  StorageStats stats{0, 0, 0};
+  StorageStats stats{};
   const size_t sector_size = partition_.sector_size_bytes();
   bool found_empty_sector = false;
+  stats.corrupt_sectors_recovered = error_stats_.corrupt_sectors_recovered;
+  stats.missing_redundant_entries_recovered =
+      error_stats_.missing_redundant_entries_recovered;
 
   for (const SectorDescriptor& sector : sectors_) {
     stats.in_use_bytes += sector.valid_bytes();
@@ -255,6 +269,28 @@
   return stats;
 }
 
+bool KeyValueStore::CheckForErrors() {
+  // Check for corrupted sectors
+  for (SectorDescriptor& sector : sectors_) {
+    if (sector.corrupt()) {
+      error_detected_ = true;
+      break;
+    }
+  }
+
+  // Check for missing redundancy.
+  if (redundancy() > 1) {
+    for (const EntryMetadata& metadata : entry_cache_) {
+      if (metadata.addresses().size() < redundancy()) {
+        error_detected_ = true;
+        break;
+      }
+    }
+  }
+
+  return error_detected();
+}
+
 Status KeyValueStore::LoadEntry(Address entry_address,
                                 Address* next_entry_address) {
   Entry entry;
@@ -304,7 +340,7 @@
 StatusWithSize KeyValueStore::Get(string_view key,
                                   span<byte> value_buffer,
                                   size_t offset_bytes) const {
-  TRY_WITH_SIZE(CheckOperation(key));
+  TRY_WITH_SIZE(CheckReadOperation(key));
 
   EntryMetadata metadata;
   TRY_WITH_SIZE(entry_cache_.FindExisting(partition_, key, &metadata));
@@ -313,12 +349,11 @@
 }
 
 Status KeyValueStore::PutBytes(string_view key, span<const byte> value) {
+  TRY(CheckWriteOperation(key));
   DBG("Writing key/value; key length=%zu, value length=%zu",
       key.size(),
       value.size());
 
-  TRY(CheckOperation(key));
-
   if (Entry::size(partition_, key, value) > partition_.sector_size_bytes()) {
     DBG("%zu B value with %zu B key cannot fit in one sector",
         value.size(),
@@ -346,7 +381,7 @@
 }
 
 Status KeyValueStore::Delete(string_view key) {
-  TRY(CheckOperation(key));
+  TRY(CheckWriteOperation(key));
 
   EntryMetadata metadata;
   TRY(entry_cache_.FindExisting(partition_, key, &metadata));
@@ -391,7 +426,7 @@
 }
 
 StatusWithSize KeyValueStore::ValueSize(string_view key) const {
-  TRY_WITH_SIZE(CheckOperation(key));
+  TRY_WITH_SIZE(CheckReadOperation(key));
 
   EntryMetadata metadata;
   TRY_WITH_SIZE(entry_cache_.FindExisting(partition_, key, &metadata));
@@ -426,7 +461,7 @@
 Status KeyValueStore::FixedSizeGet(std::string_view key,
                                    void* value,
                                    size_t size_bytes) const {
-  TRY(CheckOperation(key));
+  TRY(CheckWriteOperation(key));
 
   EntryMetadata metadata;
   TRY(entry_cache_.FindExisting(partition_, key, &metadata));
@@ -463,16 +498,31 @@
   return StatusWithSize(entry.value_size());
 }
 
-Status KeyValueStore::CheckOperation(string_view key) const {
+Status KeyValueStore::CheckWriteOperation(string_view key) const {
   if (InvalidKey(key)) {
     return Status::INVALID_ARGUMENT;
   }
+
+  // For normal write operation the KVS must be fully ready.
   if (!initialized()) {
     return Status::FAILED_PRECONDITION;
   }
   return Status::OK;
 }
 
+Status KeyValueStore::CheckReadOperation(string_view key) const {
+  if (InvalidKey(key)) {
+    return Status::INVALID_ARGUMENT;
+  }
+
+  // Operations that are explicitly read-only can be done after init() has been
+  // called but not fully ready (when needing maintenance).
+  if (initialized_ == InitializationState::kNotInitialized) {
+    return Status::FAILED_PRECONDITION;
+  }
+  return Status::OK;
+}
+
 Status KeyValueStore::WriteEntryForExistingKey(EntryMetadata& metadata,
                                                EntryState new_state,
                                                string_view key,
@@ -575,7 +625,7 @@
       do_auto_gc = false;
     }
     // Garbage collect and then try again to find the best sector.
-    Status gc_status = GarbageCollectPartial(reserved);
+    Status gc_status = GarbageCollect(reserved);
     if (!gc_status.ok()) {
       if (gc_status == Status::NOT_FOUND) {
         // Not enough space, and no reclaimable bytes, this KVS is full!
@@ -603,6 +653,16 @@
   return result;
 }
 
+Status KeyValueStore::MarkSectorCorruptIfNotOk(Status status,
+                                               SectorDescriptor* sector) {
+  if (!status.ok()) {
+    DBG("  Sector %u corrupt", sectors_.Index(sector));
+    sector->mark_corrupt();
+    error_detected_ = true;
+  }
+  return status;
+}
+
 Status KeyValueStore::AppendEntry(const Entry& entry,
                                   string_view key,
                                   span<const byte> value) {
@@ -618,11 +678,11 @@
         entry.size(),
         size_t(entry.address()),
         result.size());
-    return result.status();
+    TRY(MarkSectorCorruptIfNotOk(result.status(), &sector));
   }
 
   if (options_.verify_on_write) {
-    TRY(entry.VerifyChecksumInFlash());
+    TRY(MarkSectorCorruptIfNotOk(entry.VerifyChecksumInFlash(), &sector));
   }
 
   sector.AddValidBytes(result.size());
@@ -648,20 +708,32 @@
 
   const Address new_address = sectors_.NextWritableAddress(*new_sector);
   const StatusWithSize result = entry.Copy(new_address);
-  new_sector->RemoveWritableBytes(result.size());
-  TRY(result);
 
+  TRY(MarkSectorCorruptIfNotOk(result.status(), new_sector));
+
+  if (options_.verify_on_write) {
+    TRY(MarkSectorCorruptIfNotOk(entry.VerifyChecksumInFlash(), new_sector));
+  }
   // Entry was written successfully; update descriptor's address and the sector
   // descriptors to reflect the new entry.
-  sectors_.FromAddress(address).RemoveValidBytes(result.size());
+  new_sector->RemoveWritableBytes(result.size());
   new_sector->AddValidBytes(result.size());
+  sectors_.FromAddress(address).RemoveValidBytes(result.size());
   address = new_address;
 
   return Status::OK;
 }
 
-Status KeyValueStore::GarbageCollectFull() {
-  DBG("Garbage Collect all sectors");
+Status KeyValueStore::FullMaintenance() {
+  if (initialized_ == InitializationState::kNotInitialized) {
+    return Status::FAILED_PRECONDITION;
+  }
+
+  DBG("Do full maintenance");
+
+  if (error_detected_) {
+    TRY(Repair());
+  }
 
   SectorDescriptor* sector = sectors_.last_new();
 
@@ -678,12 +750,20 @@
     }
   }
 
-  DBG("Garbage Collect all complete");
+  DBG("Full maintenance complete");
   return Status::OK;
 }
 
-Status KeyValueStore::GarbageCollectPartial(
-    span<const Address> reserved_addresses) {
+Status KeyValueStore::GarbageCollect(span<const Address> reserved_addresses) {
+  if (initialized_ == InitializationState::kNotInitialized) {
+    return Status::FAILED_PRECONDITION;
+  }
+
+  // Do automatic repair, if KVS options allow for it.
+  if (error_detected_ && options_.recovery != ErrorRecovery::kManual) {
+    TRY(Repair());
+  }
+
   DBG("Garbage Collect a single sector");
   for (Address address : reserved_addresses) {
     DBG("   Avoid address %u", unsigned(address));
@@ -720,6 +800,7 @@
 
 Status KeyValueStore::GarbageCollectSector(
     SectorDescriptor& sector_to_gc, span<const Address> reserved_addresses) {
+  DBG("  Garbage Collect sector %u", sectors_.Index(sector_to_gc));
   // Step 1: Move any valid entries in the GC sector to other sectors
   if (sector_to_gc.valid_bytes() != 0) {
     for (const EntryMetadata& metadata : entry_cache_) {
@@ -736,7 +817,7 @@
   }
 
   // Step 2: Reinitialize the sector
-  sector_to_gc.set_writable_bytes(0);
+  sector_to_gc.mark_corrupt();
   TRY(partition_.Erase(sectors_.BaseAddress(sector_to_gc), 1));
   sector_to_gc.set_writable_bytes(partition_.sector_size_bytes());
 
@@ -744,6 +825,171 @@
   return Status::OK;
 }
 
+// Add any missing redundant entries/copies for a key.
+Status KeyValueStore::AddRedundantEntries(EntryMetadata& metadata) {
+  SectorDescriptor* new_sector;
+
+  Entry entry;
+
+  // For simplicity use just the first copy. Any known bad copies should have
+  // been removed already.
+  // TODO: Add support to read other copies if needed.
+  TRY(Entry::Read(partition_, metadata.first_address(), formats_, &entry));
+  TRY(entry.VerifyChecksumInFlash());
+
+  for (size_t i = metadata.addresses().size();
+       metadata.addresses().size() < redundancy();
+       i++) {
+    TRY(sectors_.FindSpace(&new_sector, entry.size(), metadata.addresses()));
+
+    const Address new_address = sectors_.NextWritableAddress(*new_sector);
+    const StatusWithSize result = entry.Copy(new_address);
+    TRY(MarkSectorCorruptIfNotOk(result.status(), new_sector));
+
+    if (options_.verify_on_write) {
+      TRY(MarkSectorCorruptIfNotOk(entry.VerifyChecksumInFlash(), new_sector));
+    }
+    // Entry was written successfully; update descriptor's address and the
+    // sector descriptors to reflect the new entry.
+    new_sector->RemoveWritableBytes(result.size());
+    new_sector->AddValidBytes(result.size());
+
+    metadata.AddNewAddress(new_address);
+  }
+  return Status::OK;
+}
+
+Status KeyValueStore::RepairCorruptSectors() {
+  // Try to GC each corrupt sector, even if previous sectors fail. If GC of a
+  // sector failed on the first pass, then do a second pass, since a later
+  // sector might have cleared up space or otherwise unblocked the earlier
+  // failed sector.
+  Status repair_status = Status::OK;
+
+  size_t loop_count = 0;
+  do {
+    loop_count++;
+    // Error of RESOURCE_EXHAUSTED indicates no space found for relocation.
+    // Reset back to OK for the next pass.
+    if (repair_status == Status::RESOURCE_EXHAUSTED) {
+      repair_status = Status::OK;
+    }
+
+    DBG("   Pass %u", unsigned(loop_count));
+    for (SectorDescriptor& sector : sectors_) {
+      if (sector.corrupt()) {
+        DBG("   Found sector %u with corruption", sectors_.Index(sector));
+        Status sector_status = GarbageCollectSector(sector, {});
+        if (sector_status.ok()) {
+          error_stats_.corrupt_sectors_recovered += 1;
+        } else if (repair_status.ok() ||
+                   repair_status == Status::RESOURCE_EXHAUSTED) {
+          repair_status = sector_status;
+        }
+      }
+    }
+    DBG("   Pass %u complete", unsigned(loop_count));
+  } while (!repair_status.ok() && loop_count < 2);
+
+  return repair_status;
+}
+
+Status KeyValueStore::EnsureFreeSectorExists() {
+  Status repair_status = Status::OK;
+  bool empty_sector_found = false;
+
+  DBG("   Find empty sector");
+  for (SectorDescriptor& sector : sectors_) {
+    if (sector.Empty(partition_.sector_size_bytes())) {
+      empty_sector_found = true;
+      DBG("   Empty sector found");
+      break;
+    }
+  }
+  if (empty_sector_found == false) {
+    DBG("   No empty sector found, attempting to GC a free sector");
+    Status sector_status = GarbageCollect(span<const Address, 0>());
+    if (repair_status.ok() && !sector_status.ok()) {
+      DBG("   Unable to free an empty sector");
+      repair_status = sector_status;
+    }
+  }
+
+  return repair_status;
+}
+
+Status KeyValueStore::EnsureEntryRedundancy() {
+  Status repair_status = Status::OK;
+
+  if (redundancy() == 1) {
+    DBG("   Redundancy not in use, nothing to check");
+    return Status::OK;
+  }
+
+  DBG("   Write any needed additional duplicate copies of key to fulfill %u "
+      "redundancy",
+      unsigned(redundancy()));
+  for (const EntryMetadata& metadata : entry_cache_) {
+    if (metadata.addresses().size() >= redundancy()) {
+      continue;
+    }
+
+    DBG("   Key with %u of %u copies found, adding missing copies",
+        unsigned(metadata.addresses().size()),
+        unsigned(redundancy()));
+    // TODO: Add non-const iterator to the entry_cache_ and remove this
+    // const_cast.
+    Status fill_status =
+        AddRedundantEntries(const_cast<EntryMetadata&>(metadata));
+    if (fill_status.ok()) {
+      error_stats_.missing_redundant_entries_recovered += 1;
+      DBG("   Key missing copies added");
+    } else {
+      DBG("   Failed to add key missing copies");
+      if (repair_status.ok()) {
+        repair_status = fill_status;
+      }
+    }
+  }
+
+  return repair_status;
+}
+
+Status KeyValueStore::Repair() {
+  // Collect and return the first error encountered.
+  Status overall_status = Status::OK;
+
+  DBG("KVS repair");
+
+  // Step 1: Garbage collect any sectors marked as corrupt.
+  Status repair_status = RepairCorruptSectors();
+  if (overall_status.ok()) {
+    overall_status = repair_status;
+  }
+
+  // Step 2: Make sure there is at least 1 empty sector. This needs to be a
+  // seperate check of sectors from step 1, because a found empty sector might
+  // get written to by a later GC that fails and does not result in a free
+  // sector.
+  repair_status = EnsureFreeSectorExists();
+  if (overall_status.ok()) {
+    overall_status = repair_status;
+  }
+
+  // Step 3: Make sure each stored key has the full number of redundant
+  // entries.
+  repair_status = EnsureEntryRedundancy();
+  if (overall_status.ok()) {
+    overall_status = repair_status;
+  }
+
+  if (overall_status.ok()) {
+    error_detected_ = false;
+    initialized_ = InitializationState::kReady;
+  }
+  return overall_status;
+}
+
 KeyValueStore::Entry KeyValueStore::CreateEntry(Address address,
                                                 string_view key,
                                                 span<const byte> value,
@@ -862,7 +1108,7 @@
 
 void KeyValueStore::LogKeyDescriptor() const {
   DBG("Key descriptors: count %zu", entry_cache_.total_entries());
-  for (auto& metadata : entry_cache_) {
+  for (const EntryMetadata& metadata : entry_cache_) {
     DBG("  - Key: %s, hash %#zx, transaction ID %zu, first address %#zx",
         metadata.state() == EntryState::kDeleted ? "Deleted" : "Valid",
         static_cast<size_t>(metadata.hash()),
diff --git a/pw_kvs/key_value_store_binary_format_test.cc b/pw_kvs/key_value_store_binary_format_test.cc
index 6ae2e5d..8042712 100644
--- a/pw_kvs/key_value_store_binary_format_test.cc
+++ b/pw_kvs/key_value_store_binary_format_test.cc
@@ -99,8 +99,17 @@
 }
 
 constexpr uint32_t kMagic = 0xc001beef;
+
 constexpr Options kNoGcOptions{
     .gc_on_write = GargbageCollectOnWrite::kDisabled,
+    .recovery = ErrorRecovery::kManual,
+    .verify_on_read = true,
+    .verify_on_write = true,
+};
+
+constexpr Options kRecoveryNoGcOptions{
+    .gc_on_write = GargbageCollectOnWrite::kDisabled,
+    .recovery = ErrorRecovery::kLazy,
     .verify_on_read = true,
     .verify_on_write = true,
 };
@@ -211,8 +220,8 @@
   flash_.buffer()[1025] = byte(0xef);
 
   ASSERT_EQ(Status::DATA_LOSS, kvs_.Init());
-  EXPECT_EQ(Status::RESOURCE_EXHAUSTED, kvs_.Put("hello", ByteStr("world")));
-  EXPECT_EQ(Status::RESOURCE_EXHAUSTED, kvs_.Put("a", ByteStr("b")));
+  EXPECT_EQ(Status::FAILED_PRECONDITION, kvs_.Put("hello", ByteStr("world")));
+  EXPECT_EQ(Status::FAILED_PRECONDITION, kvs_.Put("a", ByteStr("b")));
 
   // Existing valid entries should still be readable.
   EXPECT_EQ(1u, kvs_.size());
@@ -230,8 +239,8 @@
   InitFlashTo(AsBytes(kEntry1, kEntry2));
 
   // Corrupt all of the 4 512-byte flash sectors. Leave the pre-init entries
-  // intact. A corrupt sector without entries should be GC'ed on init because
-  // the KVS must maintain one empty sector at all times.
+  // intact. The KVS should be unavailable because recovery is set to full
+  // manual, and it does not have the required one empty sector at all times.
   flash_.buffer()[64] = byte(0xef);
   flash_.buffer()[513] = byte(0xef);
   flash_.buffer()[1025] = byte(0xef);
@@ -241,7 +250,7 @@
 
   auto stats = kvs_.GetStorageStats();
   EXPECT_EQ(64u, stats.in_use_bytes);
-  EXPECT_EQ(3 * 512u - 64u, stats.reclaimable_bytes);
+  EXPECT_EQ(4 * 512u - 64u, stats.reclaimable_bytes);
   EXPECT_EQ(0u, stats.writable_bytes);
 }
 
@@ -281,8 +290,8 @@
 
   auto stats = kvs_.GetStorageStats();
   EXPECT_EQ(stats.in_use_bytes, 0u);
-  EXPECT_EQ(stats.reclaimable_bytes, 32u);
-  EXPECT_EQ(stats.writable_bytes, 512u * 3 - 32);
+  EXPECT_EQ(stats.reclaimable_bytes, 512u);
+  EXPECT_EQ(stats.writable_bytes, 512u * 2);
 
   // The bytes were marked used, so a new key should not overlap with the bytes
   // from the failed Put.
@@ -290,8 +299,201 @@
 
   stats = kvs_.GetStorageStats();
   EXPECT_EQ(stats.in_use_bytes, (32u * kvs_.redundancy()));
-  EXPECT_EQ(stats.reclaimable_bytes, 32u);
-  EXPECT_EQ(stats.writable_bytes, 512u * 3 - (32 + (32 * kvs_.redundancy())));
+  EXPECT_EQ(stats.reclaimable_bytes, 512u);
+  EXPECT_EQ(stats.writable_bytes, 512u * 2 - (32 * kvs_.redundancy()));
+}
+
+class KvsErrorRecovery : public ::testing::Test {
+ protected:
+  KvsErrorRecovery()
+      : flash_(internal::Entry::kMinAlignmentBytes),
+        partition_(&flash_),
+        kvs_(&partition_,
+             {.magic = kMagic, .checksum = &checksum},
+             kRecoveryNoGcOptions) {}
+
+  void InitFlashTo(span<const byte> contents) {
+    partition_.Erase();
+    std::memcpy(flash_.buffer().data(), contents.data(), contents.size());
+  }
+
+  FakeFlashBuffer<512, 4> flash_;
+  FlashPartition partition_;
+  KeyValueStoreBuffer<kMaxEntries, kMaxUsableSectors> kvs_;
+};
+
+TEST_F(KvsErrorRecovery, Init_Ok) {
+  InitFlashTo(AsBytes(kEntry1, kEntry2));
+
+  EXPECT_EQ(Status::OK, kvs_.Init());
+  byte buffer[64];
+  EXPECT_EQ(Status::OK, kvs_.Get("key1", buffer).status());
+  EXPECT_EQ(Status::OK, kvs_.Get("k2", buffer).status());
+}
+
+TEST_F(KvsErrorRecovery, Init_DuplicateEntries_RecoversDuringInit) {
+  InitFlashTo(AsBytes(kEntry1, kEntry1));
+
+  EXPECT_EQ(Status::OK, kvs_.Init());
+  auto stats = kvs_.GetStorageStats();
+  EXPECT_EQ(stats.corrupt_sectors_recovered, 1u);
+
+  byte buffer[64];
+  EXPECT_EQ(Status::OK, kvs_.Get("key1", buffer).status());
+  EXPECT_EQ(Status::NOT_FOUND, kvs_.Get("k2", buffer).status());
+}
+
+TEST_F(KvsErrorRecovery, Init_CorruptEntry_FindsSubsequentValidEntry) {
+  // Corrupt each byte in the first entry once.
+  for (size_t i = 0; i < kEntry1.size(); ++i) {
+    InitFlashTo(AsBytes(kEntry1, kEntry2));
+    flash_.buffer()[i] = byte(int(flash_.buffer()[i]) + 1);
+
+    ASSERT_EQ(Status::OK, kvs_.Init());
+    byte buffer[64];
+    ASSERT_EQ(Status::NOT_FOUND, kvs_.Get("key1", buffer).status());
+    ASSERT_EQ(Status::OK, kvs_.Get("k2", buffer).status());
+
+    auto stats = kvs_.GetStorageStats();
+    // One valid entry.
+    ASSERT_EQ(32u, stats.in_use_bytes);
+    // The sector with corruption should have been recovered.
+    ASSERT_EQ(0u, stats.reclaimable_bytes);
+    ASSERT_EQ(1u, stats.corrupt_sectors_recovered);
+  }
+}
+
+TEST_F(KvsErrorRecovery, Init_CorruptEntry_CorrectlyAccountsForSectorSize) {
+  InitFlashTo(AsBytes(kEntry1, kEntry2, kEntry3, kEntry4));
+
+  // Corrupt the first and third entries.
+  flash_.buffer()[9] = byte(0xef);
+  flash_.buffer()[67] = byte(0xef);
+
+  ASSERT_EQ(Status::OK, kvs_.Init());
+
+  EXPECT_EQ(2u, kvs_.size());
+
+  byte buffer[64];
+  EXPECT_EQ(Status::NOT_FOUND, kvs_.Get("key1", buffer).status());
+  EXPECT_EQ(Status::OK, kvs_.Get("k2", buffer).status());
+  EXPECT_EQ(Status::NOT_FOUND, kvs_.Get("k3y", buffer).status());
+  EXPECT_EQ(Status::OK, kvs_.Get("4k", buffer).status());
+
+  auto stats = kvs_.GetStorageStats();
+  ASSERT_EQ(64u, stats.in_use_bytes);
+  ASSERT_EQ(0u, stats.reclaimable_bytes);
+  ASSERT_EQ(1472u, stats.writable_bytes);
+  ASSERT_EQ(1u, stats.corrupt_sectors_recovered);
+}
+
+TEST_F(KvsErrorRecovery, Init_ReadError_IsNotInitialized) {
+  InitFlashTo(AsBytes(kEntry1, kEntry2));
+
+  flash_.InjectReadError(
+      FlashError::InRange(Status::UNAUTHENTICATED, kEntry1.size()));
+
+  EXPECT_EQ(Status::UNKNOWN, kvs_.Init());
+  EXPECT_FALSE(kvs_.initialized());
+}
+
+TEST_F(KvsErrorRecovery, Init_CorruptSectors_ShouldBeUnwritable) {
+  InitFlashTo(AsBytes(kEntry1, kEntry2));
+
+  // Corrupt 3 of the 4 512-byte flash sectors. Corrupt sectors should be
+  // recovered via garbage collection.
+  flash_.buffer()[1] = byte(0xef);
+  flash_.buffer()[513] = byte(0xef);
+  flash_.buffer()[1025] = byte(0xef);
+
+  ASSERT_EQ(Status::OK, kvs_.Init());
+  EXPECT_EQ(Status::OK, kvs_.Put("hello", ByteStr("world")));
+  EXPECT_EQ(Status::OK, kvs_.Put("a", ByteStr("b")));
+
+  // Existing valid entries should still be readable.
+  EXPECT_EQ(3u, kvs_.size());
+  byte buffer[64];
+  EXPECT_EQ(Status::NOT_FOUND, kvs_.Get("key1", buffer).status());
+  EXPECT_EQ(Status::OK, kvs_.Get("k2", buffer).status());
+
+  auto stats = kvs_.GetStorageStats();
+  EXPECT_EQ(96u, stats.in_use_bytes);
+  EXPECT_EQ(0u, stats.reclaimable_bytes);
+  EXPECT_EQ(1440u, stats.writable_bytes);
+  ASSERT_EQ(3u, stats.corrupt_sectors_recovered);
+}
+
+TEST_F(KvsErrorRecovery, Init_CorruptSectors_ShouldRecoverOne) {
+  InitFlashTo(AsBytes(kEntry1, kEntry2));
+
+  // Corrupt all of the 4 512-byte flash sectors. Leave the pre-init entries
+  // intact. As part of recovery all corrupt sectors should get garbage
+  // collected.
+  flash_.buffer()[64] = byte(0xef);
+  flash_.buffer()[513] = byte(0xef);
+  flash_.buffer()[1025] = byte(0xef);
+  flash_.buffer()[1537] = byte(0xef);
+
+  ASSERT_EQ(Status::OK, kvs_.Init());
+
+  auto stats = kvs_.GetStorageStats();
+  EXPECT_EQ(64u, stats.in_use_bytes);
+  EXPECT_EQ(0u, stats.reclaimable_bytes);
+  EXPECT_EQ(3 * 512u - 64u, stats.writable_bytes);
+}
+
+TEST_F(KvsErrorRecovery, Init_CorruptKey_RevertsToPreviousVersion) {
+  constexpr auto kVersion7 =
+      MakeValidEntry(kMagic, 7, "my_key", ByteStr("version 7"));
+  constexpr auto kVersion8 =
+      MakeValidEntry(kMagic, 8, "my_key", ByteStr("version 8"));
+
+  InitFlashTo(AsBytes(kVersion7, kVersion8));
+
+  // Corrupt a byte of entry version 8 (addresses 32-63).
+  flash_.buffer()[34] = byte(0xef);
+
+  ASSERT_EQ(Status::OK, kvs_.Init());
+
+  char buffer[64] = {};
+
+  EXPECT_EQ(1u, kvs_.size());
+
+  auto result = kvs_.Get("my_key", as_writable_bytes(span(buffer)));
+  EXPECT_EQ(Status::OK, result.status());
+  EXPECT_EQ(sizeof("version 7") - 1, result.size());
+  EXPECT_STREQ("version 7", buffer);
+
+  EXPECT_EQ(32u, kvs_.GetStorageStats().in_use_bytes);
+}
+
+TEST_F(KvsErrorRecovery, Put_WriteFailure_EntryNotAddedButBytesMarkedWritten) {
+  ASSERT_EQ(Status::OK, kvs_.Init());
+  flash_.InjectWriteError(FlashError::Unconditional(Status::UNAVAILABLE, 1));
+
+  EXPECT_EQ(Status::UNAVAILABLE, kvs_.Put("key1", ByteStr("value1")));
+  EXPECT_EQ(true, kvs_.error_detected());
+
+  EXPECT_EQ(Status::NOT_FOUND, kvs_.Get("key1", span<byte>()).status());
+  ASSERT_TRUE(kvs_.empty());
+
+  auto stats = kvs_.GetStorageStats();
+  EXPECT_EQ(stats.in_use_bytes, 0u);
+  EXPECT_EQ(stats.reclaimable_bytes, 512u);
+  EXPECT_EQ(stats.writable_bytes, 512u * 2);
+  EXPECT_EQ(stats.corrupt_sectors_recovered, 0u);
+  EXPECT_EQ(stats.missing_redundant_entries_recovered, 0u);
+
+  // The bytes were marked used, so a new key should not overlap with the bytes
+  // from the failed Put.
+  EXPECT_EQ(Status::OK, kvs_.Put("key1", ByteStr("value1")));
+
+  stats = kvs_.GetStorageStats();
+  EXPECT_EQ(stats.in_use_bytes, (32u * kvs_.redundancy()));
+  EXPECT_EQ(stats.reclaimable_bytes, 512u);
+  EXPECT_EQ(stats.writable_bytes, 512u * 2 - (32 * kvs_.redundancy()));
+  EXPECT_EQ(stats.corrupt_sectors_recovered, 0u);
+  EXPECT_EQ(stats.missing_redundant_entries_recovered, 0u);
 }
 
 constexpr uint32_t kAltMagic = 0xbadD00D;
@@ -328,7 +530,7 @@
                  {.magic = kAltMagic, .checksum = &alt_checksum},
                  {.magic = kNoChecksumMagic, .checksum = nullptr},
              }},
-             kNoGcOptions) {
+             kRecoveryNoGcOptions) {
     partition_.Erase();
     std::memcpy(flash_.buffer().data(),
                 kInitialContents.data(),
diff --git a/pw_kvs/key_value_store_map_test.cc b/pw_kvs/key_value_store_map_test.cc
index 1f46d14..e2d8f68 100644
--- a/pw_kvs/key_value_store_map_test.cc
+++ b/pw_kvs/key_value_store_map_test.cc
@@ -319,7 +319,7 @@
 
   void GCFull() {
     StartOperation("GCFull");
-    Status status = kvs_.GarbageCollectFull();
+    Status status = kvs_.FullMaintenance();
     EXPECT_EQ(Status::OK, status);
     KeyValueStore::StorageStats post_stats = kvs_.GetStorageStats();
     EXPECT_EQ(post_stats.reclaimable_bytes, 0U);
@@ -329,7 +329,7 @@
   void GCPartial() {
     StartOperation("GCPartial");
     KeyValueStore::StorageStats pre_stats = kvs_.GetStorageStats();
-    Status status = kvs_.GarbageCollectPartial();
+    Status status = kvs_.PartialMaintenance();
     KeyValueStore::StorageStats post_stats = kvs_.GetStorageStats();
     if (pre_stats.reclaimable_bytes != 0) {
       EXPECT_EQ(Status::OK, status);
diff --git a/pw_kvs/public/pw_kvs/internal/entry_cache.h b/pw_kvs/public/pw_kvs/internal/entry_cache.h
index 48688a6..fde68fe 100644
--- a/pw_kvs/public/pw_kvs/internal/entry_cache.h
+++ b/pw_kvs/public/pw_kvs/internal/entry_cache.h
@@ -58,6 +58,9 @@
     addresses_ = span(addresses_.begin(), addresses_.size() + 1);
   }
 
+  // Remove an address from the entry metadata.
+  void RemoveAddress(Address address_to_remove);
+
   // Resets the KeyDescrtiptor and addresses to refer to the provided
   // KeyDescriptor and address.
   void Reset(const KeyDescriptor& descriptor, Address address);
diff --git a/pw_kvs/public/pw_kvs/key_value_store.h b/pw_kvs/public/pw_kvs/key_value_store.h
index 69193e4..62c037d 100644
--- a/pw_kvs/public/pw_kvs/key_value_store.h
+++ b/pw_kvs/public/pw_kvs/key_value_store.h
@@ -49,10 +49,15 @@
   // Immediately do full recovery of any errors that are detected.
   kImmediate,
 
-  // Recover from errors, but do some time consuming steps at a later time. Such
-  // as waiting to garbage collect sectors with corrupt entries until the next
-  // garbage collection.
+  // Recover from errors, but delay time consuming recovery steps until later
+  // as part of other time consuming operations. Such as waiting to garbage
+  // collect sectors with corrupt entries until the next garbage collection.
   kLazy,
+
+  // Only recover from errors when manually triggered as part of maintenance
+  // operations. This is not recommended for normal use, only for test or
+  // read-only use.
+  kManual,
 };
 
 struct Options {
@@ -90,7 +95,9 @@
   //
   Status Init();
 
-  bool initialized() const { return initialized_; }
+  bool initialized() const {
+    return initialized_ == InitializationState::kReady;
+  }
 
   // Reads the value of an entry in the KVS. The value is read into the provided
   // buffer and the number of bytes read is returned. If desired, the read can
@@ -171,14 +178,17 @@
   //
   StatusWithSize ValueSize(std::string_view key) const;
 
-  // Perform garbage collection of all reclaimable space in the KVS.
-  Status GarbageCollectFull();
+  // Perform all maintenance possible, including all neeeded repairing of
+  // corruption and garbage collection of all reclaimable space in the KVS. When
+  // configured for manual recovery, this is the only way KVS repair is
+  // triggered.
+  Status FullMaintenance();
 
-  // Perform garbage collection of part of the KVS, typically a single sector or
-  // similar unit that makes sense for the KVS implementation.
-  Status GarbageCollectPartial() {
-    return GarbageCollectPartial(span<const Address>());
-  }
+  // Perform a portion of KVS maintenance. If configured for at least lazy
+  // recovery, will do any needed repairing of corruption. Does garbage
+  // collection of part of the KVS, typically a single sector or similar unit
+  // that makes sense for the KVS implementation.
+  Status PartialMaintenance() { return GarbageCollect(span<const Address>()); }
 
   void LogDebugInfo() const;
 
@@ -280,6 +290,8 @@
     size_t writable_bytes;
     size_t in_use_bytes;
     size_t reclaimable_bytes;
+    size_t corrupt_sectors_recovered;
+    size_t missing_redundant_entries_recovered;
   };
 
   StorageStats GetStorageStats() const;
@@ -287,6 +299,9 @@
   // Level of redundancy to use for writing entries.
   size_t redundancy() const { return entry_cache_.redundancy(); }
 
+  bool error_detected() const { return error_detected_; }
+  bool CheckForErrors();
+
  protected:
   using Address = FlashPartition::Address;
   using Entry = internal::Entry;
@@ -340,7 +355,8 @@
                       void* value,
                       size_t size_bytes) const;
 
-  Status CheckOperation(std::string_view key) const;
+  Status CheckWriteOperation(std::string_view key) const;
+  Status CheckReadOperation(std::string_view key) const;
 
   Status WriteEntryForExistingKey(EntryMetadata& metadata,
                                   EntryState new_state,
@@ -364,6 +380,8 @@
                            size_t entry_size,
                            span<const Address> addresses_to_skip);
 
+  Status MarkSectorCorruptIfNotOk(Status status, SectorDescriptor* sector);
+
   Status AppendEntry(const Entry& entry,
                      std::string_view key,
                      span<const std::byte> value);
@@ -372,7 +390,9 @@
                        KeyValueStore::Address& address,
                        span<const Address> addresses_to_skip);
 
-  Status GarbageCollectPartial(span<const Address> addresses_to_skip);
+  // Find and garbage collect a singe sector that does not include an address to
+  // skip.
+  Status GarbageCollect(span<const Address> addresses_to_skip);
 
   Status RelocateKeyAddressesInSector(SectorDescriptor& sector_to_gc,
                                       const EntryMetadata& descriptor,
@@ -381,7 +401,15 @@
   Status GarbageCollectSector(SectorDescriptor& sector_to_gc,
                               span<const Address> addresses_to_skip);
 
-  Status Repair() { return Status::UNIMPLEMENTED; }
+  Status AddRedundantEntries(EntryMetadata& metadata);
+
+  Status RepairCorruptSectors();
+
+  Status EnsureFreeSectorExists();
+
+  Status EnsureEntryRedundancy();
+
+  Status Repair();
 
   internal::Entry CreateEntry(Address address,
                               std::string_view key,
@@ -403,10 +431,27 @@
 
   Options options_;
 
-  bool initialized_;
+  enum class InitializationState {
+    // KVS Init() has not been called and KVS is not usable.
+    kNotInitialized,
+
+    // KVS Init() run but not all the needed invariants are met for an operating
+    // KVS. KVS is not writable, but read operaions are possible.
+    kNeedsMaintenance,
+
+    // KVS is fully ready for use.
+    kReady,
+  };
+  InitializationState initialized_;
 
   bool error_detected_;
 
+  struct ErrorStats {
+    size_t corrupt_sectors_recovered;
+    size_t missing_redundant_entries_recovered;
+  };
+  ErrorStats error_stats_;
+
   uint32_t last_transaction_id_;
 };