pw_kvs: Add support for multiple redundant copies of entries

Add support to KVS for storing multiple identical copies of key-value
entries.

Currently only tested for kEntryRedundancy = 1.

Change-Id: Ibb8e5667f1f8406070a8d8854c1145daa76c5aea
diff --git a/pw_kvs/key_value_store.cc b/pw_kvs/key_value_store.cc
index eb1631f..7f2dc2d 100644
--- a/pw_kvs/key_value_store.cc
+++ b/pw_kvs/key_value_store.cc
@@ -108,7 +108,7 @@
       if (status == Status::DATA_LOSS) {
         // The entry could not be read, indicating data corruption within the
         // sector. Try to scan the remainder of the sector for other entries.
-        ERR("KVS init: data loss detected in sector %u at address %zu",
+        WRN("KVS init: data loss detected in sector %u at address %zu",
             SectorIndex(&sector),
             size_t(entry_address));
 
@@ -171,7 +171,7 @@
   for (KeyDescriptor& key_descriptor : key_descriptors_) {
     for (auto& address : key_descriptor.addresses()) {
       Entry entry;
-      TRY(Entry::Read(partition_, key_descriptor.address(), formats_, &entry));
+      TRY(Entry::Read(partition_, address, formats_, &entry));
       SectorFromAddress(address)->AddValidBytes(entry.size());
     }
     if (key_descriptor.IsNewerThan(last_transaction_id_)) {
@@ -376,7 +376,7 @@
 
   if (status.ok()) {
     // TODO: figure out logging how to support multiple addresses.
-    DBG("Overwriting entry for key %#08" PRIx32 " in %u sectors including %u",
+    DBG("Overwriting entry for key %#010" PRIx32 " in %u sectors including %u",
         key_descriptor->hash(),
         unsigned(key_descriptor->addresses().size()),
         SectorIndex(SectorFromAddress(key_descriptor->address())));
@@ -398,7 +398,7 @@
   TRY(FindExistingKeyDescriptor(key, &key_descriptor));
 
   // TODO: figure out logging how to support multiple addresses.
-  DBG("Writing tombstone for key %#08" PRIx32 " in %u sectors including %u",
+  DBG("Writing tombstone for key %#010" PRIx32 " in %u sectors including %u",
       key_descriptor->hash(),
       unsigned(key_descriptor->addresses().size()),
       SectorIndex(SectorFromAddress(key_descriptor->address())));
@@ -462,6 +462,8 @@
                                   span<std::byte> value_buffer,
                                   size_t offset_bytes) const {
   Entry entry;
+  // TODO: add support for using one of the redundant entries if reading the
+  // first copy fails.
   TRY_WITH_SIZE(
       Entry::Read(partition_, descriptor.address(), formats_, &entry));
 
@@ -582,23 +584,23 @@
                                                KeyDescriptor::State new_state,
                                                string_view key,
                                                span<const byte> value) {
-  // Find the original entry and sector to update the sector's valid_bytes.
   Entry original_entry;
+  // TODO: add support for using one of the redundant entries if reading the
+  // first copy fails.
   TRY(Entry::Read(
       partition_, key_descriptor->address(), formats_, &original_entry));
 
-  SectorDescriptor* sector;
-  TRY(FindOrRecoverSectorWithSpace(&sector,
-                                   Entry::size(partition_, key, value)));
-  DBG("Writing existing entry; found sector %u (%#" PRIx32 ")",
-      SectorIndex(sector),
-      SectorBaseAddress(sector));
+  // Create a new temporary key descriptor to use while writing the new
+  // key-value out to flash. Once the writing is done, update the main
+  // descriptor for this key with the new information.
+  KeyDescriptor new_key_descriptor(key);
+  TRY(WriteEntry(&new_key_descriptor, key, value, new_state));
 
-  // TODO: Verify the copy does a full copy including the address vector.
+  // Update the main descriptor for the new key version.
   KeyDescriptor old_key_descriptor = *key_descriptor;
+  *key_descriptor = new_key_descriptor;
 
-  TRY(AppendEntry(sector, key_descriptor, key, value, new_state));
-
+  // Remove all the valid bytes for the old key version, which are now stale.
   for (auto& address : old_key_descriptor.addresses()) {
     SectorFromAddress(address)->RemoveValidBytes(original_entry.size());
   }
@@ -614,23 +616,151 @@
     return Status::RESOURCE_EXHAUSTED;
   }
 
-  SectorDescriptor* sector;
-  TRY(FindOrRecoverSectorWithSpace(&sector,
-                                   Entry::size(partition_, key, value)));
-  DBG("Writing new entry; found sector: %u", SectorIndex(sector));
-
-  // Create the KeyDescriptor that will be added to the list. The transaction ID
-  // and address will be set by AppendEntry.
+  // Create the KeyDescriptor that will be added to the list and write it to
+  // flash.
   KeyDescriptor key_descriptor(key);
-  TRY(AppendEntry(sector, &key_descriptor, key, value, KeyDescriptor::kValid));
+  TRY(WriteEntry(&key_descriptor, key, value, KeyDescriptor::kValid));
 
   // Only add the entry when we are certain the write succeeded.
   key_descriptors_.push_back(key_descriptor);
   return Status::OK;
 }
 
+Status KeyValueStore::WriteEntry(KeyDescriptor* key_descriptor,
+                                 string_view key,
+                                 span<const byte> value,
+                                 KeyDescriptor::State new_state) {
+  size_t entry_size = Entry::size(partition_, key, value);
+
+  Entry entry = CreateEntry(0, key, value, new_state);
+  *key_descriptor = entry.descriptor(key);
+  key_descriptor->addresses().clear();
+
+  // For number of redundany entries to be written, do the following:
+  // - Find a sector to write an individual entry to. This optionally will
+  //   include garbage collecting one or more sectors if needed.
+  // - Write the entry to the sector.
+  // - Repeat for redundancy number of total entries.
+  for (size_t i = 0; i < internal::kEntryRedundancy; i++) {
+    SectorDescriptor* sector;
+    TRY(GetSectorForWrite(&sector, entry_size, key_descriptor));
+
+    DBG("Writing entry %zu; found sector: %u", i, SectorIndex(sector));
+    const Address write_address = NextWritableAddress(sector);
+    TRY(AppendEntry(write_address, entry, key, value));
+
+    // Entry copy was written successfully; update the key descriptor to reflect
+    // the new entry.
+    key_descriptor->addresses().push_back(write_address);
+  }
+
+  // Once all the entries are written, add valid bytes to each of the sectors
+  // that entries were written to.
+  for (auto new_address : key_descriptor->addresses()) {
+    SectorFromAddress(new_address)->AddValidBytes(entry.size());
+  }
+
+  return Status::OK;
+}
+
+// Find a sector to use for writing a new entry to. Do automatic garbage
+// collection if needed and allowed.
+//
+//                 OK: Sector found with needed space.
+// RESOURCE_EXHAUSTED: No sector available with the needed space.
+Status KeyValueStore::GetSectorForWrite(SectorDescriptor** sector,
+                                        size_t entry_size,
+                                        KeyDescriptor* key_descriptor) {
+  Status result = FindSectorWithSpace(
+      sector, entry_size, kAppendEntry, key_descriptor->addresses());
+
+  bool do_auto_gc = options_.gc_on_write != GargbageCollectOnWrite::kDisabled;
+
+  // Do garbage collection as needed, so long as policy allows.
+  while (result == Status::RESOURCE_EXHAUSTED && do_auto_gc) {
+    if (options_.gc_on_write == GargbageCollectOnWrite::kOneSector) {
+      // If GC config option is kOneSector clear the flag to not do any more
+      // GC after this try.
+      do_auto_gc = false;
+    }
+    // Garbage collect and then try again to find the best sector.
+    Status gc_status = GarbageCollectPartial();
+    if (!gc_status.ok()) {
+      if (gc_status == Status::NOT_FOUND) {
+        // Not enough space, and no reclaimable bytes, this KVS is full!
+        return Status::RESOURCE_EXHAUSTED;
+      }
+      return gc_status;
+    }
+
+    result = FindSectorWithSpace(
+        sector, entry_size, kAppendEntry, key_descriptor->addresses());
+  }
+
+  if (!result.ok()) {
+    WRN("Unable to find sector to write %zu B", entry_size);
+  }
+  return result;
+}
+
+Status KeyValueStore::AppendEntry(Address write_address,
+                                  Entry& entry,
+                                  string_view key,
+                                  span<const byte> value) {
+  entry.UpdateAddress(write_address);
+
+  StatusWithSize result = entry.Write(key, value);
+  // Remove any bytes that were written, even if the write was not successful.
+  // This is important to retain the writable space invariant on the sectors.
+  SectorFromAddress(write_address)->RemoveWritableBytes(result.size());
+
+  if (!result.ok()) {
+    ERR("Failed to write %zu bytes at %#zx. %zu actually written",
+        entry.size(),
+        size_t(write_address),
+        result.size());
+    return result.status();
+  }
+
+  if (options_.verify_on_write) {
+    TRY(entry.VerifyChecksumInFlash());
+  }
+
+  return Status::OK;
+}
+
 Status KeyValueStore::RelocateEntry(KeyDescriptor& key_descriptor,
-                                    KeyValueStore::Address address) {
+                                    KeyValueStore::Address old_address) {
+  Entry entry;
+  TRY(Entry::Read(partition_, old_address, formats_, &entry));
+
+  // Find a new sector for the entry and write it to the new location. For
+  // relocation the find should not not be a sector already containing the key
+  // but can be the always empty sector, since this is part of the GC process
+  // that will result in a new empty sector. Also find a sector that does not
+  // have reclaimable space (mostly for the full GC, where that would result in
+  // an immediate extra relocation).
+  SectorDescriptor* new_sector;
+
+  TRY(FindSectorWithSpace(
+      &new_sector, entry.size(), kGarbageCollect, key_descriptor.addresses()));
+  const Address new_address = NextWritableAddress(new_sector);
+  TRY(MoveEntry(new_address, entry));
+
+  // TODO: Perhaps add check that the entry matches the key descriptor (key
+  // hash, ID, checksum).
+
+  // Entry was written successfully; update the key descriptor and the sector
+  // descriptors to reflect the new entry.
+  key_descriptor.UpdateAddress(old_address, new_address);
+  new_sector->AddValidBytes(entry.size());
+  SectorFromAddress(old_address)->RemoveValidBytes(entry.size());
+
+  return Status::OK;
+}
+
+Status KeyValueStore::MoveEntry(Address new_address, Entry& entry) {
+  // Step 1: Read the old entry.
   struct TempEntry {
     Entry::KeyBuffer key;
     std::array<byte, sizeof(working_buffer_) - sizeof(key)> value;
@@ -638,16 +768,9 @@
   auto [key_buffer, value_buffer] =
       *std::launder(reinterpret_cast<TempEntry*>(working_buffer_.data()));
 
-  DBG("Relocating entry at %zx for key %#010" PRIx32,
-      size_t(address),
-      key_descriptor.hash());
-
   // Read the entry to be relocated. Store the entry in a local variable and
   // store the key and value in the TempEntry stored in the static allocated
   // working_buffer_.
-  Entry entry;
-  TRY(Entry::Read(partition_, key_descriptor.address(), formats_, &entry));
-
   TRY_ASSIGN(size_t key_length, entry.ReadKey(key_buffer));
   string_view key = string_view(key_buffer.data(), key_length);
 
@@ -659,43 +782,30 @@
   const span value = span(value_buffer.data(), result.size());
   TRY(entry.VerifyChecksum(key, value));
 
-  // Find a new sector for the entry and write it to the new location. For
-  // relocation the find should not not be a sector already containing the key
-  // but can be the always empty sector, since this is part of the GC process
-  // that will result in a new empty sector. Also find a sector that does not
-  // have reclaimable space (mostly for the full GC, where that would result in
-  // an immediate extra relocation).
-  SectorDescriptor* new_sector;
+  DBG("Moving %zu B entry with transaction ID %zu to address %#zx",
+      entry.size(),
+      size_t(entry.transaction_id()),
+      size_t(new_address));
 
-  // Build a vector of sectors to avoid.
-  Vector<SectorDescriptor*, internal::kEntryRedundancy> old_sectors;
-  for (auto& address : key_descriptor.addresses()) {
-    old_sectors.push_back(SectorFromAddress(address));
+  // Step 2: Write the entry to the new location.
+  entry.UpdateAddress(new_address);
+  result = entry.Write(key, value);
+
+  // Remove any bytes that were written, even if the write was not successful.
+  // This is important to retain the writable space invariant on the sectors.
+  SectorFromAddress(new_address)->RemoveWritableBytes(result.size());
+
+  if (!result.ok()) {
+    ERR("Failed to write %zu bytes at %" PRIx32 ". %zu actually written",
+        entry.size(),
+        new_address,
+        result.size());
+    return result.status();
   }
 
-  // TODO: Remove this once const span can take a non-const span.
-  auto old_sectors_const =
-      span(const_cast<const SectorDescriptor**>(old_sectors.data()),
-           old_sectors.size());
-
-  TRY(FindSectorWithSpace(
-      &new_sector, entry.size(), kGarbageCollect, old_sectors_const));
-
-  // TODO: This does an entry with new transaction ID. This needs to get changed
-  // to be a copy of this entry with the same transaction ID.
-  TRY(AppendEntry(
-      new_sector, &key_descriptor, key, value, key_descriptor.state()));
-
-  // Do the valid bytes accounting for the sector the entry was relocated from.
-  // TODO: AppendEntry() creates an entry with new transaction ID. While that is
-  // used all the old sectors need the valid bytes to be removed. Once it is
-  // switched over to do a copy of the current entry with the same transaction
-  // ID, then the valid bytes need to be removed from only the one sector being
-  // relocated out of.
-  //  SectorFromAddress(address)->RemoveValidBytes(entry.size());
-  (void)address;
-  for (auto& old_sector : old_sectors) {
-    old_sector->RemoveValidBytes(entry.size());
+  // Step 3: Verify write to the new location.
+  if (options_.verify_on_write) {
+    TRY(entry.VerifyChecksumInFlash());
   }
 
   return Status::OK;
@@ -709,10 +819,16 @@
     SectorDescriptor** found_sector,
     size_t size,
     FindSectorMode find_mode,
-    span<const SectorDescriptor*> sectors_to_skip) {
+    span<const Address> addresses_to_skip) {
   SectorDescriptor* first_empty_sector = nullptr;
   bool at_least_two_empty_sectors = (find_mode == kGarbageCollect);
 
+  // Build a vector of sectors to avoid.
+  Vector<const SectorDescriptor*, internal::kEntryRedundancy> sectors_to_skip;
+  for (auto& address : addresses_to_skip) {
+    sectors_to_skip.push_back(SectorFromAddress(address));
+  }
+
   DBG("Find sector with %zu bytes available, starting with sector %u",
       size,
       SectorIndex(last_new_sector_));
@@ -784,18 +900,6 @@
   return Status::RESOURCE_EXHAUSTED;
 }
 
-Status KeyValueStore::FindOrRecoverSectorWithSpace(SectorDescriptor** sector,
-                                                   size_t size) {
-  Status result = FindSectorWithSpace(sector, size, kAppendEntry);
-  if (result == Status::RESOURCE_EXHAUSTED &&
-      options_.gc_on_write != GargbageCollectOnWrite::kDisabled) {
-    // Garbage collect and then try again to find the best sector.
-    TRY(GarbageCollectPartial());
-    return FindSectorWithSpace(sector, size, kAppendEntry);
-  }
-  return result;
-}
-
 KeyValueStore::SectorDescriptor* KeyValueStore::FindSectorToGarbageCollect() {
   const size_t sector_size_bytes = partition_.sector_size_bytes();
   SectorDescriptor* sector_candidate = nullptr;
@@ -861,8 +965,8 @@
   SectorDescriptor* sector_to_gc = FindSectorToGarbageCollect();
 
   if (sector_to_gc == nullptr) {
-    // Nothing to GC, all done.
-    return Status::OK;
+    // Nothing to GC.
+    return Status::NOT_FOUND;
   }
 
   TRY(GarbageCollectSector(sector_to_gc));
@@ -896,43 +1000,6 @@
   return Status::OK;
 }
 
-Status KeyValueStore::AppendEntry(SectorDescriptor* sector,
-                                  KeyDescriptor* key_descriptor,
-                                  string_view key,
-                                  span<const byte> value,
-                                  KeyDescriptor::State new_state) {
-  const Address address = NextWritableAddress(sector);
-  Entry entry = CreateEntry(address, key, value, new_state);
-
-  DBG("Appending %zu B entry with transaction ID %" PRIu32 " to address %#zx",
-      entry.size(),
-      entry.transaction_id(),
-      size_t(address));
-
-  StatusWithSize result = entry.Write(key, value);
-  // Remove any bytes that were written, even if the write was not successful.
-  // This is important to retain the writable space invariant on the sectors.
-  sector->RemoveWritableBytes(result.size());
-
-  if (!result.ok()) {
-    ERR("Failed to write %zu bytes at %" PRIx32 ". %zu actually written",
-        entry.size(),
-        address,
-        result.size());
-    return result.status();
-  }
-
-  if (options_.verify_on_write) {
-    TRY(entry.VerifyChecksumInFlash());
-  }
-
-  // Entry was written successfully; update the key descriptor and the sector
-  // descriptor to reflect the new entry.
-  entry.UpdateDescriptor(key_descriptor);
-  sector->AddValidBytes(result.size());
-  return Status::OK;
-}
-
 KeyValueStore::Entry KeyValueStore::CreateEntry(Address address,
                                                 std::string_view key,
                                                 span<const byte> value,
diff --git a/pw_kvs/key_value_store_map_test.cc b/pw_kvs/key_value_store_map_test.cc
index a6b2555..9a33b76 100644
--- a/pw_kvs/key_value_store_map_test.cc
+++ b/pw_kvs/key_value_store_map_test.cc
@@ -311,11 +311,12 @@
     StartOperation("GCPartial", "");
     KeyValueStore::StorageStats pre_stats = kvs_.GetStorageStats();
     Status status = kvs_.GarbageCollectPartial();
-    EXPECT_EQ(Status::OK, status);
     KeyValueStore::StorageStats post_stats = kvs_.GetStorageStats();
     if (pre_stats.reclaimable_bytes != 0) {
+      EXPECT_EQ(Status::OK, status);
       EXPECT_LT(post_stats.reclaimable_bytes, pre_stats.reclaimable_bytes);
     } else {
+      EXPECT_EQ(Status::NOT_FOUND, status);
       EXPECT_EQ(post_stats.reclaimable_bytes, 0U);
     }
     FinishOperation("GCPartial", status, "");
diff --git a/pw_kvs/public/pw_kvs/internal/entry.h b/pw_kvs/public/pw_kvs/internal/entry.h
index 773e4c3..0560f28 100644
--- a/pw_kvs/public/pw_kvs/internal/entry.h
+++ b/pw_kvs/public/pw_kvs/internal/entry.h
@@ -94,14 +94,10 @@
         deleted() ? KeyDescriptor::kDeleted : KeyDescriptor::kValid);
   }
 
-  void UpdateDescriptor(KeyDescriptor* kd) {
-    kd->transaction_id_ = transaction_id();
-    kd->addresses_.assign(1, address_);
-    kd->state_ = deleted() ? KeyDescriptor::kDeleted : KeyDescriptor::kValid;
-  }
-
   StatusWithSize Write(std::string_view key, span<const std::byte> value) const;
 
+  void UpdateAddress(Address address) { address_ = address; }
+
   // Reads a key into a buffer, which must be large enough for a max-length key.
   // If successful, the size is returned in the StatusWithSize. The key is not
   // null terminated.
diff --git a/pw_kvs/public/pw_kvs/internal/key_descriptor.h b/pw_kvs/public/pw_kvs/internal/key_descriptor.h
index 9db386e..9397c4e 100644
--- a/pw_kvs/public/pw_kvs/internal/key_descriptor.h
+++ b/pw_kvs/public/pw_kvs/internal/key_descriptor.h
@@ -46,6 +46,19 @@
   // TODO: remove address() once all the use of it is gone.
   uint32_t address() const { return addresses_[0]; }
 
+  Status UpdateAddress(FlashPartition::Address old_address,
+                       FlashPartition::Address new_address) {
+    for (auto& address : addresses()) {
+      if (address == old_address) {
+        address = new_address;
+        return Status::OK;
+      }
+    }
+
+    // Unable to find the address to update.
+    return Status::INVALID_ARGUMENT;
+  }
+
   Vector<FlashPartition::Address, kEntryRedundancy>& addresses() {
     return addresses_;
   }
diff --git a/pw_kvs/public/pw_kvs/key_value_store.h b/pw_kvs/public/pw_kvs/key_value_store.h
index 8b6cecf..bb4b108 100644
--- a/pw_kvs/public/pw_kvs/key_value_store.h
+++ b/pw_kvs/public/pw_kvs/key_value_store.h
@@ -310,8 +310,19 @@
         "as_bytes(span(&value, 1)) or as_writable_bytes(span(&value, 1)).");
   }
 
+  Status LoadEntry(Address entry_address, Address* next_entry_address);
+  Status ScanForEntry(const SectorDescriptor& sector,
+                      Address start_address,
+                      Address* next_entry_address);
+  Status AppendNewOrOverwriteStaleExistingDescriptor(
+      const KeyDescriptor& key_descriptor);
+
+  KeyDescriptor* FindDescriptor(uint32_t hash);
+
   Status PutBytes(std::string_view key, span<const std::byte> value);
 
+  StatusWithSize ValueSize(const KeyDescriptor& descriptor) const;
+
   StatusWithSize Get(std::string_view key,
                      const KeyDescriptor& descriptor,
                      span<std::byte> value_buffer,
@@ -326,8 +337,6 @@
                       void* value,
                       size_t size_bytes) const;
 
-  StatusWithSize ValueSize(const KeyDescriptor& descriptor) const;
-
   Status CheckOperation(std::string_view key) const;
 
   Status FindKeyDescriptor(std::string_view key,
@@ -348,14 +357,6 @@
         key, const_cast<const KeyDescriptor**>(result));
   }
 
-  Status LoadEntry(Address entry_address, Address* next_entry_address);
-  Status ScanForEntry(const SectorDescriptor& sector,
-                      Address start_address,
-                      Address* next_entry_address);
-  Status AppendNewOrOverwriteStaleExistingDescriptor(
-      const KeyDescriptor& key_descriptor);
-  Status AppendEmptyDescriptor(KeyDescriptor** new_descriptor);
-
   Status WriteEntryForExistingKey(KeyDescriptor* key_descriptor,
                                   KeyDescriptor::State new_state,
                                   std::string_view key,
@@ -363,30 +364,35 @@
 
   Status WriteEntryForNewKey(std::string_view key, span<const std::byte> value);
 
+  Status WriteEntry(KeyDescriptor* key_descriptor,
+                    std::string_view key,
+                    span<const std::byte> value,
+                    KeyDescriptor::State new_state);
+
+  Status GetSectorForWrite(SectorDescriptor** sector,
+                           size_t entry_size,
+                           KeyDescriptor* key_descriptor);
+
+  Status AppendEntry(Address write_address,
+                     Entry& entry,
+                     std::string_view key,
+                     span<const std::byte> value);
+
   Status RelocateEntry(KeyDescriptor& key_descriptor,
                        KeyValueStore::Address address);
 
+  Status MoveEntry(Address new_address, Entry& entry);
+
   enum FindSectorMode { kAppendEntry, kGarbageCollect };
 
   Status FindSectorWithSpace(SectorDescriptor** found_sector,
                              size_t size,
                              FindSectorMode find_mode,
-                             span<const SectorDescriptor*> sector_to_skip =
-                                 span<const SectorDescriptor*>());
-
-  Status FindOrRecoverSectorWithSpace(SectorDescriptor** sector, size_t size);
-
-  Status GarbageCollectSector(SectorDescriptor* sector_to_gc);
+                             span<const Address> addresses_to_skip);
 
   SectorDescriptor* FindSectorToGarbageCollect();
 
-  KeyDescriptor* FindDescriptor(uint32_t hash);
-
-  Status AppendEntry(SectorDescriptor* sector,
-                     KeyDescriptor* key_descriptor,
-                     std::string_view key,
-                     span<const std::byte> value,
-                     KeyDescriptor::State new_state);
+  Status GarbageCollectSector(SectorDescriptor* sector_to_gc);
 
   bool AddressInSector(const SectorDescriptor& sector, Address address) const {
     const Address sector_base = SectorBaseAddress(&sector);