pw_kvs: Add better error handling during read, write, and init

- When reading a key entry, use all of the redundant copies before
  failing.
- Add CopyEntryToSector helper that makes a copy of a key to another
  sector.
- Move FindExisting method and add wrapper for Find from EntryCache in
  mto KVS to allow the methods to report errors that are found.

Change-Id: Ia2da21e4ec3ad164b8620aa3949c524c0d916835
diff --git a/pw_kvs/entry_cache.cc b/pw_kvs/entry_cache.cc
index 6d80bb6..4528ab2 100644
--- a/pw_kvs/entry_cache.cc
+++ b/pw_kvs/entry_cache.cc
@@ -58,42 +58,62 @@
   addresses_ = addresses_.first(1);
 }
 
-Status EntryCache::Find(FlashPartition& partition,
-                        string_view key,
-                        EntryMetadata* metadata) const {
+StatusWithSize EntryCache::Find(FlashPartition& partition,
+                                const Sectors& sectors,
+                                const EntryFormats& formats,
+                                string_view key,
+                                EntryMetadata* metadata) const {
   const uint32_t hash = internal::Hash(key);
   Entry::KeyBuffer key_buffer;
+  bool error_detected = false;
 
   for (size_t i = 0; i < descriptors_.size(); ++i) {
     if (descriptors_[i].key_hash == hash) {
-      TRY(Entry::ReadKey(
-          partition, *first_address(i), key.size(), key_buffer.data()));
+      bool key_found = false;
+      string_view read_key;
 
-      if (key == string_view(key_buffer.data(), key.size())) {
+      for (Address address : addresses(i)) {
+        Status read_result =
+            Entry::ReadKey(partition, address, key.size(), key_buffer.data());
+
+        read_key = string_view(key_buffer.data(), key.size());
+
+        if (read_result.ok() && hash == internal::Hash(read_key)) {
+          key_found = true;
+          break;
+        } else {
+          // A hash mismatch can be caused by reading invalid data or a key hash
+          // collision of keys with differing size. To verify the data read from
+          // flash is good, validate the entry.
+          Entry entry;
+          read_result = Entry::Read(partition, address, formats, &entry);
+          if (read_result.ok() && entry.VerifyChecksumInFlash().ok()) {
+            key_found = true;
+            break;
+          }
+
+          PW_LOG_WARN(
+              "   Found corrupt entry, invalidating this copy of the key");
+          error_detected = true;
+          sectors.FromAddress(address).mark_corrupt();
+        }
+      }
+      size_t error_val = error_detected ? 1 : 0;
+
+      if (!key_found) {
+        PW_LOG_ERROR("No valid entries for key. Data has been lost!");
+        return StatusWithSize(Status::DATA_LOSS, error_val);
+      } else if (key == read_key) {
         PW_LOG_DEBUG("Found match for key hash 0x%08" PRIx32, hash);
         *metadata = EntryMetadata(descriptors_[i], addresses(i));
-        return Status::OK;
+        return StatusWithSize(Status::OK, error_val);
       } else {
         PW_LOG_WARN("Found key hash collision for 0x%08" PRIx32, hash);
-        return Status::ALREADY_EXISTS;
+        return StatusWithSize(Status::ALREADY_EXISTS, error_val);
       }
     }
   }
-  return Status::NOT_FOUND;
-}
-
-Status EntryCache::FindExisting(FlashPartition& partition,
-                                string_view key,
-                                EntryMetadata* metadata) const {
-  Status status = Find(partition, key, metadata);
-
-  // If the key's hash collides with an existing key or if the key is deleted,
-  // treat it as if it is not in the KVS.
-  if (status == Status::ALREADY_EXISTS ||
-      (status.ok() && metadata->state() == EntryState::kDeleted)) {
-    return Status::NOT_FOUND;
-  }
-  return status;
+  return StatusWithSize::NOT_FOUND;
 }
 
 EntryMetadata EntryCache::AddNew(const KeyDescriptor& descriptor,
diff --git a/pw_kvs/entry_cache_test.cc b/pw_kvs/entry_cache_test.cc
index 82678fb..76bf5dc 100644
--- a/pw_kvs/entry_cache_test.cc
+++ b/pw_kvs/entry_cache_test.cc
@@ -194,6 +194,8 @@
   EXPECT_EQ(99u, it->first_address());
 }
 
+constexpr size_t kSectorSize = 64;
+
 constexpr auto kTheEntry = AsBytes(uint32_t(12345),  // magic
                                    uint32_t(0),      // checksum
                                    uint8_t(0),       // alignment (16 B)
@@ -201,7 +203,9 @@
                                    uint16_t(0),                   // value size
                                    uint32_t(123),  // transaction ID
                                    ByteStr(kTheKey));
-constexpr std::array<byte, 16 - kTheEntry.size() % 16> kPadding1{};
+constexpr std::array<byte, kSectorSize - kTheEntry.size() % kSectorSize>
+    kPadding1{};
+constexpr size_t kSize1 = kTheEntry.size() + kPadding1.size();
 
 constexpr char kCollision1[] = "9FDC";
 constexpr char kCollision2[] = "axzzK";
@@ -214,7 +218,9 @@
             uint16_t(0),                       // value size
             uint32_t(123),                     // transaction ID
             ByteStr(kCollision1));
-constexpr std::array<byte, 16 - kCollisionEntry.size() % 16> kPadding2{};
+constexpr std::array<byte, kSectorSize - kCollisionEntry.size() % kSectorSize>
+    kPadding2{};
+constexpr size_t kSize2 = kCollisionEntry.size() + kPadding2.size();
 
 constexpr auto kDeletedEntry =
     AsBytes(uint32_t(12345),                  // magic
@@ -224,29 +230,65 @@
             uint16_t(0xffff),                 // value size (deleted)
             uint32_t(123),                    // transaction ID
             ByteStr("delorted"));
+constexpr std::array<byte, kSectorSize - kDeletedEntry.size() % kSectorSize>
+    kPadding3{};
+
+constexpr EntryFormat kFormat{.magic = uint32_t(12345), .checksum = nullptr};
 
 class InitializedEntryCache : public EmptyEntryCache {
  protected:
   static_assert(Hash(kCollision1) == Hash(kCollision2));
 
   InitializedEntryCache()
-      : flash_(AsBytes(
-            kTheEntry, kPadding1, kCollisionEntry, kPadding2, kDeletedEntry)),
-        partition_(&flash_) {
-    entries_.AddNew(kDescriptor, 0);
+      : flash_(AsBytes(kTheEntry,
+                       kPadding1,
+                       kTheEntry,
+                       kPadding1,
+                       kCollisionEntry,
+                       kPadding2,
+                       kDeletedEntry,
+                       kPadding3)),
+        partition_(&flash_),
+        sectors_(sector_descriptors_, partition_, nullptr),
+        format_(kFormat) {
+    sectors_.Reset();
+    size_t address = 0;
+    auto entry = entries_.AddNew(kDescriptor, address);
+
+    address += kSize1;
+    entry.AddNewAddress(kSize1);
+
+    address += kSize1;
     entries_.AddNew({.key_hash = Hash(kCollision1),
                      .transaction_id = 125,
                      .state = EntryState::kDeleted},
-                    kTheEntry.size() + kPadding1.size());
+                    address);
+
+    address += kSize2;
     entries_.AddNew({.key_hash = Hash("delorted"),
                      .transaction_id = 256,
                      .state = EntryState::kDeleted},
-                    kTheEntry.size() + kPadding1.size() +
-                        kCollisionEntry.size() + kPadding2.size());
+                    address);
   }
 
-  FakeFlashBuffer<64, 128> flash_;
+  void CheckForCorruptSectors(SectorDescriptor* sector1 = nullptr,
+                              SectorDescriptor* sector2 = nullptr) {
+    for (auto& sector : sectors_) {
+      bool expect_corrupt =
+          (&sector == sector1 || &sector == sector2) ? true : false;
+
+      EXPECT_EQ(expect_corrupt, sector.corrupt());
+    }
+  }
+
+  static constexpr size_t kTotalSectors = 128;
+  FakeFlashBuffer<kSectorSize, kTotalSectors> flash_;
   FlashPartition partition_;
+
+  Vector<SectorDescriptor, kTotalSectors> sector_descriptors_;
+  Sectors sectors_;
+
+  EntryFormats format_;
 };
 
 TEST_F(InitializedEntryCache, EntryCounts) {
@@ -265,51 +307,80 @@
 
 TEST_F(InitializedEntryCache, Find_PresentEntry) {
   EntryMetadata metadata;
-  ASSERT_EQ(Status::OK, entries_.Find(partition_, kTheKey, &metadata));
+
+  StatusWithSize result =
+      entries_.Find(partition_, sectors_, format_, kTheKey, &metadata);
+
+  ASSERT_EQ(Status::OK, result.status());
+  EXPECT_EQ(0u, result.size());
   EXPECT_EQ(Hash(kTheKey), metadata.hash());
   EXPECT_EQ(EntryState::kValid, metadata.state());
+  CheckForCorruptSectors();
+}
+
+TEST_F(InitializedEntryCache, Find_PresentEntryWithSingleReadError) {
+  // Inject 2 read errors so that the initial key read and the follow-up full
+  // read of the first entry fail.
+  flash_.InjectReadError(FlashError::Unconditional(Status::INTERNAL, 2));
+
+  EntryMetadata metadata;
+
+  StatusWithSize result =
+      entries_.Find(partition_, sectors_, format_, kTheKey, &metadata);
+
+  ASSERT_EQ(Status::OK, result.status());
+  EXPECT_EQ(1u, result.size());
+  EXPECT_EQ(Hash(kTheKey), metadata.hash());
+  EXPECT_EQ(EntryState::kValid, metadata.state());
+  CheckForCorruptSectors(&sectors_.FromAddress(0));
+}
+
+TEST_F(InitializedEntryCache, Find_PresentEntryWithMultiReadError) {
+  flash_.InjectReadError(FlashError::Unconditional(Status::INTERNAL, 4));
+
+  EntryMetadata metadata;
+
+  StatusWithSize result =
+      entries_.Find(partition_, sectors_, format_, kTheKey, &metadata);
+
+  ASSERT_EQ(Status::DATA_LOSS, result.status());
+  EXPECT_EQ(1u, result.size());
+  CheckForCorruptSectors(&sectors_.FromAddress(0),
+                         &sectors_.FromAddress(kSize1));
 }
 
 TEST_F(InitializedEntryCache, Find_DeletedEntry) {
   EntryMetadata metadata;
-  ASSERT_EQ(Status::OK, entries_.Find(partition_, "delorted", &metadata));
+
+  StatusWithSize result =
+      entries_.Find(partition_, sectors_, format_, "delorted", &metadata);
+
+  ASSERT_EQ(Status::OK, result.status());
+  EXPECT_EQ(0u, result.size());
   EXPECT_EQ(Hash("delorted"), metadata.hash());
   EXPECT_EQ(EntryState::kDeleted, metadata.state());
+  CheckForCorruptSectors();
 }
 
 TEST_F(InitializedEntryCache, Find_MissingEntry) {
   EntryMetadata metadata;
-  ASSERT_EQ(Status::NOT_FOUND, entries_.Find(partition_, "3.141", &metadata));
+
+  StatusWithSize result =
+      entries_.Find(partition_, sectors_, format_, "3.141", &metadata);
+
+  ASSERT_EQ(Status::NOT_FOUND, result.status());
+  EXPECT_EQ(0u, result.size());
+  CheckForCorruptSectors();
 }
 
 TEST_F(InitializedEntryCache, Find_Collision) {
   EntryMetadata metadata;
-  EXPECT_EQ(Status::ALREADY_EXISTS,
-            entries_.Find(partition_, kCollision2, &metadata));
-}
 
-TEST_F(InitializedEntryCache, FindExisting_PresentEntry) {
-  EntryMetadata metadata;
-  ASSERT_EQ(Status::OK, entries_.FindExisting(partition_, kTheKey, &metadata));
-  EXPECT_EQ(Hash(kTheKey), metadata.hash());
-}
-
-TEST_F(InitializedEntryCache, FindExisting_DeletedEntry) {
-  EntryMetadata metadata;
-  ASSERT_EQ(Status::NOT_FOUND,
-            entries_.FindExisting(partition_, "delorted", &metadata));
-}
-
-TEST_F(InitializedEntryCache, FindExisting_MissingEntry) {
-  EntryMetadata metadata;
-  ASSERT_EQ(Status::NOT_FOUND,
-            entries_.FindExisting(partition_, "3.141", &metadata));
-}
-
-TEST_F(InitializedEntryCache, FindExisting_Collision) {
-  EntryMetadata metadata;
-  EXPECT_EQ(Status::NOT_FOUND,
-            entries_.FindExisting(partition_, kCollision2, &metadata));
+  StatusWithSize result =
+      entries_.Find(partition_, sectors_, format_, kCollision2, &metadata);
+  EXPECT_EQ(Status::ALREADY_EXISTS, result.status());
+  EXPECT_EQ(0u, result.size());
+  CheckForCorruptSectors();
 }
 
 }  // namespace
diff --git a/pw_kvs/key_value_store.cc b/pw_kvs/key_value_store.cc
index 9fa7199..2bc6ddf 100644
--- a/pw_kvs/key_value_store.cc
+++ b/pw_kvs/key_value_store.cc
@@ -56,7 +56,6 @@
 Status KeyValueStore::Init() {
   initialized_ = InitializationState::kNotInitialized;
   error_detected_ = false;
-  error_stats_ = {};
   last_transaction_id_ = 0;
   sectors_.Reset();
   entry_cache_.Reset();
@@ -174,17 +173,39 @@
   DBG("Second pass: Count valid bytes in each sector");
   Address newest_key = 0;
 
-  // For every valid entry, count the valid bytes in that sector. Track which
-  // entry has the newest transaction ID for initializing last_new_sector_.
-  for (const EntryMetadata& metadata : entry_cache_) {
+  // For every valid entry, for each address, count the valid bytes in that
+  // sector. If the address fails to read, remove the address and mark the
+  // sector as corrupt. Track which entry has the newest transaction ID for
+  // initializing last_new_sector_.
+  for (EntryMetadata& metadata : entry_cache_) {
     if (metadata.addresses().size() < redundancy()) {
       error_detected_ = true;
     }
-    for (Address address : metadata.addresses()) {
+    size_t index = 0;
+    while (index < metadata.addresses().size()) {
+      Address address = metadata.addresses()[index];
       Entry entry;
-      TRY(Entry::Read(partition_, address, formats_, &entry));
-      sectors_.FromAddress(address).AddValidBytes(entry.size());
+
+      Status read_result = Entry::Read(partition_, address, formats_, &entry);
+
+      SectorDescriptor& sector = sectors_.FromAddress(address);
+
+      if (read_result.ok()) {
+        sector.AddValidBytes(entry.size());
+        index++;
+      } else {
+        corrupt_entries++;
+        total_corrupt_bytes += sector.writable_bytes();
+        error_detected_ = true;
+        sector.mark_corrupt();
+
+        // Remove the bad address and stay at this index. The removal
+        // replaces out the removed address with the back address so
+        // this index needs to be rechecked with the new address.
+        metadata.RemoveAddress(address);
+      }
     }
+
     if (metadata.IsNewerThan(last_transaction_id_)) {
       last_transaction_id_ = metadata.transaction_id();
       newest_key = metadata.addresses().back();
@@ -201,14 +222,15 @@
     initialized_ = InitializationState::kReady;
   } else {
     if (options_.recovery != ErrorRecovery::kManual) {
-      WRN("KVS init: Corruption detected, beginning repair");
+      WRN("KVS init: Corruption detected, beginning repair. Found %zu corrupt "
+          "bytes and %d corrupt entries.",
+          total_corrupt_bytes,
+          corrupt_entries);
       Status recovery_status = Repair();
 
       if (recovery_status.ok()) {
         WRN("KVS init: Corruption detected and fully repaired");
         initialized_ = InitializationState::kReady;
-        total_corrupt_bytes = 0;
-        corrupt_entries = 0;
       } else if (recovery_status == Status::RESOURCE_EXHAUSTED) {
         WRN("KVS init: Unable to maintain required free sector");
         initialized_ = InitializationState::kNeedsMaintenance;
@@ -230,11 +252,9 @@
       partition_.sector_size_bytes());
 
   // Report any corruption was not repaired.
-  if (total_corrupt_bytes > 0) {
-    WRN("Found %zu corrupt bytes and %d corrupt entries during init process; "
-        "some keys may be missing",
-        total_corrupt_bytes,
-        corrupt_entries);
+  if (error_detected_) {
+    WRN("KVS init: Corruption found but not repaired, KVS unavailable until "
+        "successful maintenance.");
     return Status::DATA_LOSS;
   }
 
@@ -269,12 +289,14 @@
   return stats;
 }
 
+// Check KVS for any error conditions. Primarily intended for test and
+// internal use.
 bool KeyValueStore::CheckForErrors() {
   // Check for corrupted sectors
   for (SectorDescriptor& sector : sectors_) {
     if (sector.corrupt()) {
       error_detected_ = true;
-      break;
+      return error_detected();
     }
   }
 
@@ -283,7 +305,7 @@
     for (const EntryMetadata& metadata : entry_cache_) {
       if (metadata.addresses().size() < redundancy()) {
         error_detected_ = true;
-        break;
+        return error_detected();
       }
     }
   }
@@ -343,7 +365,7 @@
   TRY_WITH_SIZE(CheckReadOperation(key));
 
   EntryMetadata metadata;
-  TRY_WITH_SIZE(entry_cache_.FindExisting(partition_, key, &metadata));
+  TRY_WITH_SIZE(FindExisting(key, &metadata));
 
   return Get(key, metadata, value_buffer, offset_bytes);
 }
@@ -362,7 +384,7 @@
   }
 
   EntryMetadata metadata;
-  Status status = entry_cache_.Find(partition_, key, &metadata);
+  Status status = FindEntry(key, &metadata);
 
   if (status.ok()) {
     // TODO: figure out logging how to support multiple addresses.
@@ -384,7 +406,7 @@
   TRY(CheckWriteOperation(key));
 
   EntryMetadata metadata;
-  TRY(entry_cache_.FindExisting(partition_, key, &metadata));
+  TRY(FindExisting(key, &metadata));
 
   // TODO: figure out logging how to support multiple addresses.
   DBG("Writing tombstone for key 0x%08" PRIx32 " in %zu sectors including %u",
@@ -398,11 +420,7 @@
   key_buffer_.fill('\0');
 
   Entry entry;
-  // TODO: add support for using one of the redundant entries if reading the
-  // first copy fails.
-  if (Entry::Read(
-          kvs_.partition_, iterator_->first_address(), kvs_.formats_, &entry)
-          .ok()) {
+  if (kvs_.ReadEntry(*iterator_, entry).ok()) {
     entry.ReadKey(key_buffer_);
   }
 }
@@ -429,20 +447,61 @@
   TRY_WITH_SIZE(CheckReadOperation(key));
 
   EntryMetadata metadata;
-  TRY_WITH_SIZE(entry_cache_.FindExisting(partition_, key, &metadata));
+  TRY_WITH_SIZE(FindExisting(key, &metadata));
 
   return ValueSize(metadata);
 }
 
+Status KeyValueStore::ReadEntry(const EntryMetadata& metadata,
+                                Entry& entry) const {
+  // Try to read an entry
+  Status read_result = Status::DATA_LOSS;
+  for (Address address : metadata.addresses()) {
+    read_result = Entry::Read(partition_, address, formats_, &entry);
+    if (read_result.ok()) {
+      return read_result;
+    }
+
+    // Found a bad address. Set the sector as corrupt.
+    error_detected_ = true;
+    sectors_.FromAddress(address).mark_corrupt();
+  }
+
+  ERR("No valid entries for key. Data has been lost!");
+  return read_result;
+}
+
+Status KeyValueStore::FindEntry(string_view key,
+                                EntryMetadata* found_entry) const {
+  StatusWithSize find_result =
+      entry_cache_.Find(partition_, sectors_, formats_, key, found_entry);
+
+  if (find_result.size() > 0u) {
+    error_detected_ = true;
+  }
+  return find_result.status();
+}
+
+Status KeyValueStore::FindExisting(string_view key,
+                                   EntryMetadata* metadata) const {
+  Status status = FindEntry(key, metadata);
+
+  // If the key's hash collides with an existing key or if the key is deleted,
+  // treat it as if it is not in the KVS.
+  if (status == Status::ALREADY_EXISTS ||
+      (status.ok() && metadata->state() == EntryState::kDeleted)) {
+    return Status::NOT_FOUND;
+  }
+  return status;
+}
+
 StatusWithSize KeyValueStore::Get(string_view key,
                                   const EntryMetadata& metadata,
                                   span<std::byte> value_buffer,
                                   size_t offset_bytes) const {
   Entry entry;
-  // TODO: add support for using one of the redundant entries if reading the
-  // first copy fails.
-  TRY_WITH_SIZE(
-      Entry::Read(partition_, metadata.first_address(), formats_, &entry));
+
+  TRY_WITH_SIZE(ReadEntry(metadata, entry));
 
   StatusWithSize result = entry.ReadValue(value_buffer, offset_bytes);
   if (result.ok() && options_.verify_on_read && offset_bytes == 0u) {
@@ -464,7 +523,7 @@
   TRY(CheckWriteOperation(key));
 
   EntryMetadata metadata;
-  TRY(entry_cache_.FindExisting(partition_, key, &metadata));
+  TRY(FindExisting(key, &metadata));
 
   return FixedSizeGet(key, metadata, value, size_bytes);
 }
@@ -490,10 +549,7 @@
 
 StatusWithSize KeyValueStore::ValueSize(const EntryMetadata& metadata) const {
   Entry entry;
-  // TODO: add support for using one of the redundant entries if reading the
-  // first copy fails.
-  TRY_WITH_SIZE(
-      Entry::Read(partition_, metadata.first_address(), formats_, &entry));
+  TRY_WITH_SIZE(ReadEntry(metadata, entry));
 
   return StatusWithSize(entry.value_size());
 }
@@ -529,9 +585,7 @@
                                                span<const byte> value) {
   // Read the original entry to get the size for sector accounting purposes.
   Entry entry;
-  // TODO: add support for using one of the redundant entries if reading the
-  // first copy fails.
-  TRY(Entry::Read(partition_, metadata.first_address(), formats_, &entry));
+  TRY(ReadEntry(metadata, entry));
 
   return WriteEntry(key, value, new_state, &metadata, entry.size());
 }
@@ -668,10 +722,7 @@
                                   span<const byte> value) {
   const StatusWithSize result = entry.Write(key, value);
 
-  // Remove any bytes that were written, even if the write was not successful.
-  // This is important to retain the writable space invariant on the sectors.
   SectorDescriptor& sector = sectors_.FromAddress(entry.address());
-  sector.RemoveWritableBytes(result.size());
 
   if (!result.ok()) {
     ERR("Failed to write %zu bytes at %#zx. %zu actually written",
@@ -685,15 +736,36 @@
     TRY(MarkSectorCorruptIfNotOk(entry.VerifyChecksumInFlash(), &sector));
   }
 
+  sector.RemoveWritableBytes(result.size());
   sector.AddValidBytes(result.size());
   return Status::OK;
 }
 
+StatusWithSize KeyValueStore::CopyEntryToSector(Entry& entry,
+                                                SectorDescriptor* new_sector,
+                                                Address& new_address) {
+  new_address = sectors_.NextWritableAddress(*new_sector);
+  const StatusWithSize result = entry.Copy(new_address);
+
+  TRY_WITH_SIZE(MarkSectorCorruptIfNotOk(result.status(), new_sector));
+
+  if (options_.verify_on_write) {
+    TRY_WITH_SIZE(
+        MarkSectorCorruptIfNotOk(entry.VerifyChecksumInFlash(), new_sector));
+  }
+  // Entry was written successfully; update descriptor's address and the sector
+  // descriptors to reflect the new entry.
+  new_sector->RemoveWritableBytes(result.size());
+  new_sector->AddValidBytes(result.size());
+
+  return result;
+}
+
 Status KeyValueStore::RelocateEntry(const EntryMetadata& metadata,
                                     KeyValueStore::Address& address,
                                     span<const Address> reserved_addresses) {
   Entry entry;
-  TRY(Entry::Read(partition_, address, formats_, &entry));
+  TRY(ReadEntry(metadata, entry));
 
   // Find a new sector for the entry and write it to the new location. For
   // relocation the find should not not be a sector already containing the key
@@ -706,19 +778,10 @@
   TRY(sectors_.FindSpaceDuringGarbageCollection(
       &new_sector, entry.size(), metadata.addresses(), reserved_addresses));
 
-  const Address new_address = sectors_.NextWritableAddress(*new_sector);
-  const StatusWithSize result = entry.Copy(new_address);
-
-  TRY(MarkSectorCorruptIfNotOk(result.status(), new_sector));
-
-  if (options_.verify_on_write) {
-    TRY(MarkSectorCorruptIfNotOk(entry.VerifyChecksumInFlash(), new_sector));
-  }
-  // Entry was written successfully; update descriptor's address and the sector
-  // descriptors to reflect the new entry.
-  new_sector->RemoveWritableBytes(result.size());
-  new_sector->AddValidBytes(result.size());
-  sectors_.FromAddress(address).RemoveValidBytes(result.size());
+  Address new_address;
+  TRY_ASSIGN(const size_t result_size,
+             CopyEntryToSector(entry, new_sector, new_address));
+  sectors_.FromAddress(address).RemoveValidBytes(result_size);
   address = new_address;
 
   return Status::OK;
@@ -730,6 +793,7 @@
   }
 
   DBG("Do full maintenance");
+  CheckForErrors();
 
   if (error_detected_) {
     TRY(Repair());
@@ -784,7 +848,7 @@
 
 Status KeyValueStore::RelocateKeyAddressesInSector(
     SectorDescriptor& sector_to_gc,
-    const EntryMetadata& metadata,
+    EntryMetadata& metadata,
     span<const Address> reserved_addresses) {
   for (FlashPartition::Address& address : metadata.addresses()) {
     if (sectors_.AddressInSector(sector_to_gc, address)) {
@@ -803,7 +867,7 @@
   DBG("  Garbage Collect sector %u", sectors_.Index(sector_to_gc));
   // Step 1: Move any valid entries in the GC sector to other sectors
   if (sector_to_gc.valid_bytes() != 0) {
-    for (const EntryMetadata& metadata : entry_cache_) {
+    for (EntryMetadata& metadata : entry_cache_) {
       TRY(RelocateKeyAddressesInSector(
           sector_to_gc, metadata, reserved_addresses));
     }
@@ -831,10 +895,7 @@
 
   Entry entry;
 
-  // For simplicity use just the first copy. Any known bad copies should have
-  // been removed already.
-  // TODO: Add support to read other copies if needed.
-  TRY(Entry::Read(partition_, metadata.first_address(), formats_, &entry));
+  TRY(ReadEntry(metadata, entry));
   TRY(entry.VerifyChecksumInFlash());
 
   for (size_t i = metadata.addresses().size();
@@ -842,17 +903,8 @@
        i++) {
     TRY(sectors_.FindSpace(&new_sector, entry.size(), metadata.addresses()));
 
-    const Address new_address = sectors_.NextWritableAddress(*new_sector);
-    const StatusWithSize result = entry.Copy(new_address);
-    TRY(MarkSectorCorruptIfNotOk(result.status(), new_sector));
-
-    if (options_.verify_on_write) {
-      TRY(MarkSectorCorruptIfNotOk(entry.VerifyChecksumInFlash(), new_sector));
-    }
-    // Entry was written successfully; update descriptor's address and the
-    // sector descriptors to reflect the new entry.
-    new_sector->RemoveWritableBytes(result.size());
-    new_sector->AddValidBytes(result.size());
+    Address new_address;
+    TRY(CopyEntryToSector(entry, new_sector, new_address));
 
     metadata.AddNewAddress(new_address);
   }
@@ -922,14 +974,14 @@
   Status repair_status = Status::OK;
 
   if (redundancy() == 1) {
-    DBG("   Redundancy not in use, nothing to check");
+    DBG("   Redundancy not in use, nothting to check");
     return Status::OK;
   }
 
-  DBG("   Write any needed additional duplicate copies of key to fulfill %u "
-      "redundancy",
+  DBG("   Write any needed additional duplicate copies of keys to fulfill %u"
+      " redundancy",
       unsigned(redundancy()));
-  for (const EntryMetadata& metadata : entry_cache_) {
+  for (EntryMetadata& metadata : entry_cache_) {
     if (metadata.addresses().size() >= redundancy()) {
       continue;
     }
@@ -937,10 +989,7 @@
     DBG("   Key with %u of %u copies found, adding missing copies",
         unsigned(metadata.addresses().size()),
         unsigned(redundancy()));
-    // TODO: Add non-const iterator to the entry_cache_ and remove this
-    // const_cast.
-    Status fill_status =
-        AddRedundantEntries(const_cast<EntryMetadata&>(metadata));
+    Status fill_status = AddRedundantEntries(metadata);
     if (fill_status.ok()) {
       error_stats_.missing_redundant_entries_recovered += 1;
       DBG("   Key missing copies added");
diff --git a/pw_kvs/key_value_store_binary_format_test.cc b/pw_kvs/key_value_store_binary_format_test.cc
index 8042712..3119edc 100644
--- a/pw_kvs/key_value_store_binary_format_test.cc
+++ b/pw_kvs/key_value_store_binary_format_test.cc
@@ -279,6 +279,9 @@
   EXPECT_EQ(32u, kvs_.GetStorageStats().in_use_bytes);
 }
 
+// The Put_WriteFailure_EntryNotAddedButBytesMarkedWritten test is run with both
+// the KvsErrorRecovery and KvsErrorHandling test fixtures (different KVS
+// configurations).
 TEST_F(KvsErrorHandling, Put_WriteFailure_EntryNotAddedButBytesMarkedWritten) {
   ASSERT_EQ(Status::OK, kvs_.Init());
   flash_.InjectWriteError(FlashError::Unconditional(Status::UNAVAILABLE, 1));
@@ -359,7 +362,7 @@
     ASSERT_EQ(32u, stats.in_use_bytes);
     // The sector with corruption should have been recovered.
     ASSERT_EQ(0u, stats.reclaimable_bytes);
-    ASSERT_EQ(1u, stats.corrupt_sectors_recovered);
+    ASSERT_EQ(i + 1u, stats.corrupt_sectors_recovered);
   }
 }
 
@@ -467,6 +470,9 @@
   EXPECT_EQ(32u, kvs_.GetStorageStats().in_use_bytes);
 }
 
+// The Put_WriteFailure_EntryNotAddedButBytesMarkedWritten test is run with both
+// the KvsErrorRecovery and KvsErrorHandling test fixtures (different KVS
+// configurations).
 TEST_F(KvsErrorRecovery, Put_WriteFailure_EntryNotAddedButBytesMarkedWritten) {
   ASSERT_EQ(Status::OK, kvs_.Init());
   flash_.InjectWriteError(FlashError::Unconditional(Status::UNAVAILABLE, 1));
@@ -561,6 +567,145 @@
   ASSERT_CONTAINS_ENTRY("kee", "O_o");
 }
 
+TEST_F(InitializedMultiMagicKvs, RecoversLossOfFirstSector) {
+  auto stats = kvs_.GetStorageStats();
+  EXPECT_EQ(stats.in_use_bytes, (160u * kvs_.redundancy()));
+  EXPECT_EQ(stats.reclaimable_bytes, 0u);
+  EXPECT_EQ(stats.writable_bytes, 512u * 3 - (160 * kvs_.redundancy()));
+  EXPECT_EQ(stats.corrupt_sectors_recovered, 0u);
+  EXPECT_EQ(stats.missing_redundant_entries_recovered, 5u);
+
+  EXPECT_EQ(Status::OK, partition_.Erase(0, 1));
+
+  ASSERT_CONTAINS_ENTRY("key1", "value1");
+  ASSERT_CONTAINS_ENTRY("k2", "value2");
+  ASSERT_CONTAINS_ENTRY("k3y", "value3");
+  ASSERT_CONTAINS_ENTRY("A Key", "XD");
+  ASSERT_CONTAINS_ENTRY("kee", "O_o");
+
+  EXPECT_EQ(true, kvs_.error_detected());
+
+  stats = kvs_.GetStorageStats();
+  EXPECT_EQ(stats.in_use_bytes, (160u * kvs_.redundancy()));
+  EXPECT_EQ(stats.reclaimable_bytes, 352u);
+  EXPECT_EQ(stats.writable_bytes, 512u * 2 - (160 * (kvs_.redundancy() - 1)));
+  EXPECT_EQ(stats.corrupt_sectors_recovered, 0u);
+  EXPECT_EQ(stats.missing_redundant_entries_recovered, 5u);
+
+  EXPECT_EQ(Status::OK, kvs_.FullMaintenance());
+  stats = kvs_.GetStorageStats();
+  EXPECT_EQ(stats.in_use_bytes, (160u * kvs_.redundancy()));
+  EXPECT_EQ(stats.reclaimable_bytes, 0u);
+  EXPECT_EQ(stats.writable_bytes, 512u * 3 - (160 * kvs_.redundancy()));
+  EXPECT_EQ(stats.corrupt_sectors_recovered, 1u);
+  EXPECT_EQ(stats.missing_redundant_entries_recovered, 5u);
+}
+
+TEST_F(InitializedMultiMagicKvs, RecoversLossOfSecondSector) {
+  auto stats = kvs_.GetStorageStats();
+  EXPECT_EQ(stats.in_use_bytes, (160u * kvs_.redundancy()));
+  EXPECT_EQ(stats.reclaimable_bytes, 0u);
+  EXPECT_EQ(stats.writable_bytes, 512u * 3 - (160 * kvs_.redundancy()));
+  EXPECT_EQ(stats.corrupt_sectors_recovered, 0u);
+  EXPECT_EQ(stats.missing_redundant_entries_recovered, 5u);
+
+  EXPECT_EQ(Status::OK, partition_.Erase(partition_.sector_size_bytes(), 1));
+
+  ASSERT_CONTAINS_ENTRY("key1", "value1");
+  ASSERT_CONTAINS_ENTRY("k2", "value2");
+  ASSERT_CONTAINS_ENTRY("k3y", "value3");
+  ASSERT_CONTAINS_ENTRY("A Key", "XD");
+  ASSERT_CONTAINS_ENTRY("kee", "O_o");
+
+  EXPECT_EQ(false, kvs_.error_detected());
+
+  EXPECT_EQ(false, kvs_.Init());
+  stats = kvs_.GetStorageStats();
+  EXPECT_EQ(stats.in_use_bytes, (160u * kvs_.redundancy()));
+  EXPECT_EQ(stats.reclaimable_bytes, 0u);
+  EXPECT_EQ(stats.writable_bytes, 512u * 3 - (160 * kvs_.redundancy()));
+  EXPECT_EQ(stats.corrupt_sectors_recovered, 0u);
+  EXPECT_EQ(stats.missing_redundant_entries_recovered, 10u);
+}
+
+TEST_F(InitializedMultiMagicKvs, SingleReadErrors) {
+  // Inject 2 read errors, so the first read attempt fully fails.
+  flash_.InjectReadError(FlashError::Unconditional(Status::INTERNAL, 2));
+
+  flash_.InjectReadError(FlashError::Unconditional(Status::INTERNAL, 1, 7));
+
+  ASSERT_CONTAINS_ENTRY("key1", "value1");
+  ASSERT_CONTAINS_ENTRY("k2", "value2");
+  ASSERT_CONTAINS_ENTRY("k3y", "value3");
+  ASSERT_CONTAINS_ENTRY("A Key", "XD");
+  ASSERT_CONTAINS_ENTRY("kee", "O_o");
+
+  EXPECT_EQ(true, kvs_.error_detected());
+
+  auto stats = kvs_.GetStorageStats();
+  EXPECT_EQ(stats.in_use_bytes, (160u * kvs_.redundancy()));
+  EXPECT_EQ(stats.reclaimable_bytes, 352u);
+  EXPECT_EQ(stats.writable_bytes, 512u * 2 - (160 * (kvs_.redundancy() - 1)));
+  EXPECT_EQ(stats.corrupt_sectors_recovered, 0u);
+  EXPECT_EQ(stats.missing_redundant_entries_recovered, 5u);
+}
+
+TEST_F(InitializedMultiMagicKvs, SingleWriteError) {
+  flash_.InjectWriteError(FlashError::Unconditional(Status::INTERNAL, 1, 1));
+
+  EXPECT_EQ(Status::INTERNAL, kvs_.Put("new key", ByteStr("abcd?")));
+
+  EXPECT_EQ(true, kvs_.error_detected());
+
+  auto stats = kvs_.GetStorageStats();
+  EXPECT_EQ(stats.in_use_bytes, 32 + (160u * kvs_.redundancy()));
+  EXPECT_EQ(stats.reclaimable_bytes, 352u);
+  EXPECT_EQ(stats.writable_bytes,
+            512u * 2 - 32 - (160 * (kvs_.redundancy() - 1)));
+  EXPECT_EQ(stats.corrupt_sectors_recovered, 0u);
+  EXPECT_EQ(stats.missing_redundant_entries_recovered, 5u);
+
+  char val[20] = {};
+  EXPECT_EQ(Status::OK,
+            kvs_.Get("new key", as_writable_bytes(span(val))).status());
+
+  EXPECT_EQ(Status::OK, kvs_.FullMaintenance());
+  stats = kvs_.GetStorageStats();
+  EXPECT_EQ(stats.in_use_bytes, (192u * kvs_.redundancy()));
+  EXPECT_EQ(stats.reclaimable_bytes, 0u);
+  EXPECT_EQ(stats.writable_bytes, 512u * 3 - (192 * kvs_.redundancy()));
+  EXPECT_EQ(stats.corrupt_sectors_recovered, 1u);
+  EXPECT_EQ(stats.missing_redundant_entries_recovered, 6u);
+
+  EXPECT_EQ(Status::OK,
+            kvs_.Get("new key", as_writable_bytes(span(val))).status());
+}
+
+TEST_F(InitializedMultiMagicKvs, DataLossAfterLosingBothCopies) {
+  EXPECT_EQ(Status::OK, partition_.Erase(0, 2));
+
+  char val[20] = {};
+  EXPECT_EQ(Status::DATA_LOSS,
+            kvs_.Get("key1", as_writable_bytes(span(val))).status());
+  EXPECT_EQ(Status::DATA_LOSS,
+            kvs_.Get("k2", as_writable_bytes(span(val))).status());
+  EXPECT_EQ(Status::DATA_LOSS,
+            kvs_.Get("k3y", as_writable_bytes(span(val))).status());
+  EXPECT_EQ(Status::DATA_LOSS,
+            kvs_.Get("A Key", as_writable_bytes(span(val))).status());
+  EXPECT_EQ(Status::DATA_LOSS,
+            kvs_.Get("kee", as_writable_bytes(span(val))).status());
+
+  EXPECT_EQ(true, kvs_.error_detected());
+
+  auto stats = kvs_.GetStorageStats();
+  EXPECT_EQ(stats.in_use_bytes, (160u * kvs_.redundancy()));
+  EXPECT_EQ(stats.reclaimable_bytes, 2 * 352u);
+  EXPECT_EQ(stats.writable_bytes, 512u);
+  EXPECT_EQ(stats.corrupt_sectors_recovered, 0u);
+  EXPECT_EQ(stats.missing_redundant_entries_recovered, 5u);
+}
+
 TEST_F(InitializedMultiMagicKvs, PutNewEntry_UsesFirstFormat) {
   EXPECT_EQ(Status::OK, kvs_.Put("new key", ByteStr("abcd?")));
 
diff --git a/pw_kvs/public/pw_kvs/internal/entry_cache.h b/pw_kvs/public/pw_kvs/internal/entry_cache.h
index 1cbb47f..5ed2f15 100644
--- a/pw_kvs/public/pw_kvs/internal/entry_cache.h
+++ b/pw_kvs/public/pw_kvs/internal/entry_cache.h
@@ -20,7 +20,9 @@
 
 #include "pw_containers/vector.h"
 #include "pw_kvs/flash_memory.h"
+#include "pw_kvs/format.h"
 #include "pw_kvs/internal/key_descriptor.h"
+#include "pw_kvs/internal/sectors.h"
 #include "pw_span/span.h"
 
 namespace pw::kvs::internal {
@@ -161,17 +163,9 @@
   //                 key's hash collides with the hash for an existing
   //                 descriptor
   //
-  Status Find(FlashPartition& partition,
-              std::string_view key,
-              EntryMetadata* metadata) const;
-
-  // Searches for a KeyDescriptor that matches this key and sets *metadata to
-  // point to it if one is found.
-  //
-  //          OK: there is a matching descriptor and *metadata is set
-  //   NOT_FOUND: there is no descriptor that matches this key
-  //
-  Status FindExisting(FlashPartition& partition,
+  StatusWithSize Find(FlashPartition& partition,
+                      const Sectors& sectors,
+                      const EntryFormats& formats,
                       std::string_view key,
                       EntryMetadata* metadata) const;
 
diff --git a/pw_kvs/public/pw_kvs/internal/sectors.h b/pw_kvs/public/pw_kvs/internal/sectors.h
index e871f0b..20cd61f 100644
--- a/pw_kvs/public/pw_kvs/internal/sectors.h
+++ b/pw_kvs/public/pw_kvs/internal/sectors.h
@@ -143,15 +143,9 @@
     return Index(sector) * partition_.sector_size_bytes();
   }
 
-  // Returns the sector that contains this address. The address must be valid.
-  SectorDescriptor& FromAddress(Address address) {
-    const size_t index = address / partition_.sector_size_bytes();
+  SectorDescriptor& FromAddress(Address address) const {
     // TODO: Add boundary checking once asserts are supported.
     // DCHECK_LT(index, sector_map_size_);`
-    return descriptors_[index];
-  }
-
-  const SectorDescriptor& FromAddress(Address address) const {
     return descriptors_[address / partition_.sector_size_bytes()];
   }
 
diff --git a/pw_kvs/public/pw_kvs/key_value_store.h b/pw_kvs/public/pw_kvs/key_value_store.h
index 40d31a3..1fa5297 100644
--- a/pw_kvs/public/pw_kvs/key_value_store.h
+++ b/pw_kvs/public/pw_kvs/key_value_store.h
@@ -188,7 +188,10 @@
   // recovery, will do any needed repairing of corruption. Does garbage
   // collection of part of the KVS, typically a single sector or similar unit
   // that makes sense for the KVS implementation.
-  Status PartialMaintenance() { return GarbageCollect(span<const Address>()); }
+  Status PartialMaintenance() {
+    CheckForErrors();
+    return GarbageCollect(span<const Address>());
+  }
 
   void LogDebugInfo() const;
 
@@ -301,6 +304,9 @@
   size_t redundancy() const { return entry_cache_.redundancy(); }
 
   bool error_detected() const { return error_detected_; }
+
+  // Check KVS for any error conditions. Primarily intended for test and
+  // internal use.
   bool CheckForErrors();
 
  protected:
@@ -342,6 +348,30 @@
 
   StatusWithSize ValueSize(const EntryMetadata& metadata) const;
 
+  Status ReadEntry(const EntryMetadata& metadata, Entry& entry) const;
+
+  // Finds the metadata for an entry matching a particular key. Searches for a
+  // KeyDescriptor that matches this key and sets *metadata to point to it if
+  // one is found.
+  //
+  //             OK: there is a matching descriptor and *metadata is set
+  //      NOT_FOUND: there is no descriptor that matches this key, but this key
+  //                 has a unique hash (and could potentially be added to the
+  //                 KVS)
+  // ALREADY_EXISTS: there is no descriptor that matches this key, but the
+  //                 key's hash collides with the hash for an existing
+  //                 descriptor
+  //
+  Status FindEntry(std::string_view key, EntryMetadata* metadata) const;
+
+  // Searches for a KeyDescriptor that matches this key and sets *metadata to
+  // point to it if one is found.
+  //
+  //          OK: there is a matching descriptor and *metadata is set
+  //   NOT_FOUND: there is no descriptor that matches this key
+  //
+  Status FindExisting(std::string_view key, EntryMetadata* metadata) const;
+
   StatusWithSize Get(std::string_view key,
                      const EntryMetadata& metadata,
                      span<std::byte> value_buffer,
@@ -387,6 +417,10 @@
                      std::string_view key,
                      span<const std::byte> value);
 
+  StatusWithSize CopyEntryToSector(Entry& entry,
+                                   SectorDescriptor* new_sector,
+                                   Address& new_address);
+
   Status RelocateEntry(const EntryMetadata& metadata,
                        KeyValueStore::Address& address,
                        span<const Address> addresses_to_skip);
@@ -396,7 +430,7 @@
   Status GarbageCollect(span<const Address> addresses_to_skip);
 
   Status RelocateKeyAddressesInSector(SectorDescriptor& sector_to_gc,
-                                      const EntryMetadata& descriptor,
+                                      EntryMetadata& descriptor,
                                       span<const Address> addresses_to_skip);
 
   Status GarbageCollectSector(SectorDescriptor& sector_to_gc,
@@ -445,7 +479,9 @@
   };
   InitializationState initialized_;
 
-  bool error_detected_;
+  // error_detected_ needs to be set from const KVS methods (such as Get), so
+  // make it mutable.
+  mutable bool error_detected_;
 
   struct ErrorStats {
     size_t corrupt_sectors_recovered;