pw_kvs: Add support for tracking redundant entries

Add support in key descriptor to track multiple redundant copies of an
entry.
Add support in Init() to find redundant entries of the same entry.

Change-Id: I19d1e17b6635f4348b69dc37b990daf15c40a813
diff --git a/pw_kvs/key_value_store.cc b/pw_kvs/key_value_store.cc
index df125d0..8e859f2 100644
--- a/pw_kvs/key_value_store.cc
+++ b/pw_kvs/key_value_store.cc
@@ -167,10 +167,11 @@
 
   // For every valid key, increment the valid bytes for that sector.
   for (KeyDescriptor& key_descriptor : key_descriptors_) {
-    Entry entry;
-    TRY(Entry::Read(partition_, key_descriptor.address(), &entry));
-    SectorFromKey(key_descriptor)->AddValidBytes(entry.size());
-
+    for (auto& address : key_descriptor.addresses()) {
+      Entry entry;
+      TRY(Entry::Read(partition_, address, &entry));
+      SectorFromAddress(address)->AddValidBytes(entry.size());
+    }
     if (key_descriptor.IsNewerThan(last_transaction_id_)) {
       last_transaction_id_ = key_descriptor.transaction_id();
       newest_key = &key_descriptor;
@@ -180,7 +181,7 @@
   if (newest_key == nullptr) {
     last_new_sector_ = sectors_.begin();
   } else {
-    last_new_sector_ = SectorFromKey(newest_key);
+    last_new_sector_ = SectorFromAddress(newest_key->addresses().back());
   }
 
   if (!empty_sector_found) {
@@ -259,9 +260,12 @@
   const string_view key(key_buffer.data(), key_length);
 
   TRY(entry.VerifyChecksumInFlash(entry_header_format_.checksum));
+
+  // A valid entry was found, so update the next entry address before doing any
+  // of the checks that happen in AppendNewOrOverwriteStaleExistingDescriptor().
+  *next_entry_address = entry.next_address();
   TRY(AppendNewOrOverwriteStaleExistingDescriptor(entry.descriptor(key)));
 
-  *next_entry_address = entry.next_address();
   return Status::OK;
 }
 
@@ -313,16 +317,30 @@
                  existing_descriptor->transaction_id())) {
     // Existing entry is old; replace the existing entry with the new one.
     *existing_descriptor = key_descriptor;
-  } else {
-    // Otherwise, check if the entries have a duplicate transaction ID, which is
-    // not valid.
-    if (existing_descriptor->transaction_id() ==
-        key_descriptor.transaction_id()) {
-      ERR("Data loss: Duplicated old(=%zu) and new(=%zu) transaction ID",
-          size_t(existing_descriptor->transaction_id()),
-          size_t(key_descriptor.transaction_id()));
+  } else if (existing_descriptor->transaction_id() ==
+             key_descriptor.transaction_id()) {
+    // If the entries have a duplicate transaction ID, add the new (redundant)
+    // entry to the existing descriptor.
+    if (existing_descriptor->hash() != key_descriptor.hash()) {
+      ERR("Duplicate entry for key %#010" PRIx32 " with transaction ID %" PRIu32
+          " has non-matching hash",
+          key_descriptor.hash(),
+          key_descriptor.transaction_id());
       return Status::DATA_LOSS;
     }
+
+    // Verify that this entry is not in the same sector as an existing copy of
+    // this same key.
+    for (auto address : existing_descriptor->addresses()) {
+      if (SectorFromAddress(address) ==
+          SectorFromAddress(key_descriptor.address())) {
+        DBG("Multiple Redundant entries in same sector %u",
+            SectorIndex(SectorFromAddress(address)));
+        return Status::DATA_LOSS;
+      }
+    }
+    existing_descriptor->addresses().push_back(key_descriptor.address());
+  } else {
     DBG("Found stale entry when appending; ignoring");
   }
   return Status::OK;
@@ -366,9 +384,11 @@
   Status status = FindKeyDescriptor(key, &key_descriptor);
 
   if (status.ok()) {
-    DBG("Overwriting entry for key %#08" PRIx32 " in sector %u",
+    // TODO: figure out logging how to support multiple addresses.
+    DBG("Overwriting entry for key %#08" PRIx32 " in %u sectors including %u",
         key_descriptor->hash(),
-        SectorIndex(SectorFromKey(key_descriptor)));
+        unsigned(key_descriptor->addresses().size()),
+        SectorIndex(SectorFromAddress(key_descriptor->address())));
     return WriteEntryForExistingKey(
         key_descriptor, KeyDescriptor::kValid, key, value);
   }
@@ -386,9 +406,11 @@
   KeyDescriptor* key_descriptor;
   TRY(FindExistingKeyDescriptor(key, &key_descriptor));
 
-  DBG("Writing tombstone for key %#08" PRIx32 " in sector %u",
+  // TODO: figure out logging how to support multiple addresses.
+  DBG("Writing tombstone for key %#08" PRIx32 " in %u sectors including %u",
       key_descriptor->hash(),
-      SectorIndex(SectorFromKey(key_descriptor)));
+      unsigned(key_descriptor->addresses().size()),
+      SectorIndex(SectorFromAddress(key_descriptor->address())));
   return WriteEntryForExistingKey(
       key_descriptor, KeyDescriptor::kDeleted, key, {});
 }
@@ -568,7 +590,6 @@
   // Find the original entry and sector to update the sector's valid_bytes.
   Entry original_entry;
   TRY(Entry::Read(partition_, key_descriptor->address(), &original_entry));
-  SectorDescriptor* old_sector = SectorFromKey(key_descriptor);
 
   SectorDescriptor* sector;
   TRY(FindOrRecoverSectorWithSpace(&sector,
@@ -577,18 +598,15 @@
       SectorIndex(sector),
       SectorBaseAddress(sector));
 
-  if (old_sector != SectorFromKey(key_descriptor)) {
-    DBG("Sector for old entry (size %zu) was garbage collected. Old entry "
-        "relocated to sector %u",
-        original_entry.size(),
-        SectorIndex(SectorFromKey(key_descriptor)));
-
-    old_sector = SectorFromKey(key_descriptor);
-  }
+  // TODO: Verify the copy does a full copy including the address vector.
+  KeyDescriptor old_key_descriptor = *key_descriptor;
 
   TRY(AppendEntry(sector, key_descriptor, key, value, new_state));
 
-  old_sector->RemoveValidBytes(original_entry.size());
+  for (auto& address : old_key_descriptor.addresses()) {
+    SectorFromAddress(address)->RemoveValidBytes(original_entry.size());
+  }
+
   return Status::OK;
 }
 
@@ -615,7 +633,8 @@
   return Status::OK;
 }
 
-Status KeyValueStore::RelocateEntry(KeyDescriptor& key_descriptor) {
+Status KeyValueStore::RelocateEntry(KeyDescriptor& key_descriptor,
+                                    KeyValueStore::Address address) {
   struct TempEntry {
     Entry::KeyBuffer key;
     std::array<byte, sizeof(working_buffer_) - sizeof(key)> value;
@@ -623,8 +642,8 @@
   auto [key_buffer, value_buffer] =
       *std::launder(reinterpret_cast<TempEntry*>(working_buffer_.data()));
 
-  DBG("Relocating entry at %zx for key %" PRIx32,
-      size_t(key_descriptor.address()),
+  DBG("Relocating entry at %zx for key %#010" PRIx32,
+      size_t(address),
       key_descriptor.hash());
 
   // Read the entry to be relocated. Store the entry in a local variable and
@@ -644,8 +663,6 @@
   const span value = span(value_buffer.data(), result.size());
   TRY(entry.VerifyChecksum(entry_header_format_.checksum, key, value));
 
-  SectorDescriptor* old_sector = SectorFromKey(key_descriptor);
-
   // Find a new sector for the entry and write it to the new location. For
   // relocation the find should not not be a sector already containing the key
   // but can be the always empty sector, since this is part of the GC process
@@ -654,16 +671,36 @@
   // an immediate extra relocation).
   SectorDescriptor* new_sector;
 
-  // TODO: For redundancy work, replace old_sector_const with a vector of
-  // sectors to avoid.
-  const SectorDescriptor* old_sector_const = old_sector;
+  // Build a vector of sectors to avoid.
+  Vector<SectorDescriptor*, internal::kEntryRedundancy> old_sectors;
+  for (auto& address : key_descriptor.addresses()) {
+    old_sectors.push_back(SectorFromAddress(address));
+  }
+
+  // TODO: Remove this once const span can take a non-const span.
+  auto old_sectors_const =
+      span(const_cast<const SectorDescriptor**>(old_sectors.data()),
+           old_sectors.size());
+
   TRY(FindSectorWithSpace(
-      &new_sector, entry.size(), kGarbageCollect, span(&old_sector_const, 1)));
+      &new_sector, entry.size(), kGarbageCollect, old_sectors_const));
+
+  // TODO: This does an entry with new transaction ID. This needs to get changed
+  // to be a copy of this entry with the same transaction ID.
   TRY(AppendEntry(
       new_sector, &key_descriptor, key, value, key_descriptor.state()));
 
   // Do the valid bytes accounting for the sector the entry was relocated from.
-  old_sector->RemoveValidBytes(entry.size());
+  // TODO: AppendEntry() creates an entry with new transaction ID. While that is
+  // used all the old sectors need the valid bytes to be removed. Once it is
+  // switched over to do a copy of the current entry with the same transaction
+  // ID, then the valid bytes need to be removed from only the one sector being
+  // relocated out of.
+  //  SectorFromAddress(address)->RemoveValidBytes(entry.size());
+  (void)address;
+  for (auto& old_sector : old_sectors) {
+    old_sector->RemoveValidBytes(entry.size());
+  }
 
   return Status::OK;
 }
@@ -842,7 +879,7 @@
     for (auto& descriptor : key_descriptors_) {
       if (AddressInSector(*sector_to_gc, descriptor.address())) {
         DBG("  Relocate entry");
-        TRY(RelocateEntry(descriptor));
+        TRY(RelocateEntry(descriptor, descriptor.address()));
       }
     }
   }
diff --git a/pw_kvs/public/pw_kvs/internal/entry.h b/pw_kvs/public/pw_kvs/internal/entry.h
index 1e9f116..89b98e9 100644
--- a/pw_kvs/public/pw_kvs/internal/entry.h
+++ b/pw_kvs/public/pw_kvs/internal/entry.h
@@ -93,7 +93,7 @@
 
   void UpdateDescriptor(KeyDescriptor* kd) {
     kd->transaction_id_ = transaction_id();
-    kd->address_ = address_;
+    kd->addresses_.assign(1, address_);
     kd->state_ = deleted() ? KeyDescriptor::kDeleted : KeyDescriptor::kValid;
   }
 
diff --git a/pw_kvs/public/pw_kvs/internal/key_descriptor.h b/pw_kvs/public/pw_kvs/internal/key_descriptor.h
index bc4d2f8..9db386e 100644
--- a/pw_kvs/public/pw_kvs/internal/key_descriptor.h
+++ b/pw_kvs/public/pw_kvs/internal/key_descriptor.h
@@ -17,23 +17,42 @@
 #include <cstdint>
 #include <string_view>
 
+#include "pw_containers/vector.h"
 #include "pw_kvs/flash_memory.h"
 #include "pw_kvs/internal/hash.h"
 
 namespace pw::kvs::internal {
 
+constexpr size_t kEntryRedundancy = 1;
+
 // Caches information about a key-value entry. Facilitates quickly finding
 // entries without having to read flash.
 class KeyDescriptor {
  public:
   enum State { kValid, kDeleted };
 
-  constexpr KeyDescriptor(std::string_view key)
-      : KeyDescriptor(key, 0, 0, kValid) {}
+  KeyDescriptor(std::string_view key) : KeyDescriptor(key, 0, 0, kValid) {}
+  KeyDescriptor(const KeyDescriptor& kd)
+      : key_hash_(kd.key_hash_),
+        transaction_id_(kd.transaction_id_),
+        addresses_(kd.addresses_.begin(), kd.addresses_.end()),
+        state_(kd.state_) {}
+
+  KeyDescriptor& operator=(const KeyDescriptor& other) = default;
 
   uint32_t hash() const { return key_hash_; }
   uint32_t transaction_id() const { return transaction_id_; }
-  uint32_t address() const { return address_; }
+
+  // TODO: remove address() once all the use of it is gone.
+  uint32_t address() const { return addresses_[0]; }
+
+  Vector<FlashPartition::Address, kEntryRedundancy>& addresses() {
+    return addresses_;
+  }
+  const Vector<FlashPartition::Address, kEntryRedundancy>& addresses() const {
+    return addresses_;
+  }
+
   State state() const { return state_; }
 
   // True if the KeyDesctiptor's transaction ID is newer than the specified ID.
@@ -47,18 +66,19 @@
  private:
   friend class Entry;
 
-  constexpr KeyDescriptor(std::string_view key,
-                          uint32_t version,
-                          FlashPartition::Address address,
-                          State initial_state)
-      : key_hash_(Hash(key)),
-        transaction_id_(version),
-        address_(address),
-        state_(initial_state) {}
+  KeyDescriptor(std::string_view key,
+                uint32_t version,
+                FlashPartition::Address address,
+                State initial_state)
+      : key_hash_(Hash(key)), transaction_id_(version), state_(initial_state) {
+    addresses_.assign(1, address);
+  }
 
   uint32_t key_hash_;
   uint32_t transaction_id_;
-  FlashPartition::Address address_;
+
+  static_assert(kEntryRedundancy > 0u);
+  Vector<FlashPartition::Address, kEntryRedundancy> addresses_;
 
   // TODO: This information could be packed into the above fields to save RAM.
   State state_;
diff --git a/pw_kvs/public/pw_kvs/key_value_store.h b/pw_kvs/public/pw_kvs/key_value_store.h
index cb76aa9..5451bc4 100644
--- a/pw_kvs/public/pw_kvs/key_value_store.h
+++ b/pw_kvs/public/pw_kvs/key_value_store.h
@@ -363,7 +363,8 @@
 
   Status WriteEntryForNewKey(std::string_view key, span<const std::byte> value);
 
-  Status RelocateEntry(KeyDescriptor& key_descriptor);
+  Status RelocateEntry(KeyDescriptor& key_descriptor,
+                       KeyValueStore::Address address);
 
   enum FindSectorMode { kAppendEntry, kGarbageCollect };
 
@@ -402,17 +403,13 @@
     return SectorIndex(sector) * partition_.sector_size_bytes();
   }
 
-  SectorDescriptor* SectorFromKey(const KeyDescriptor& descriptor) {
-    const size_t index = descriptor.address() / partition_.sector_size_bytes();
+  SectorDescriptor* SectorFromAddress(const FlashPartition::Address address) {
+    const size_t index = address / partition_.sector_size_bytes();
     // TODO: Add boundary checking once asserts are supported.
-    // DCHECK_LT(index, sector_map_size_);
+    // DCHECK_LT(index, sector_map_size_);`
     return &sectors_[index];
   }
 
-  SectorDescriptor* SectorFromKey(const KeyDescriptor* descriptor) {
-    return SectorFromKey(*descriptor);
-  }
-
   Address NextWritableAddress(const SectorDescriptor* sector) const {
     return SectorBaseAddress(sector) + partition_.sector_size_bytes() -
            sector->writable_bytes();