pw_kvs: EntryCache class

- Allocate space for addresses in the derived KeyValueStore class. This
  allows having multiple KeyValueStores with different redundancies,
  without any wasted memory.
- Addresses are moved to a contiguous array instead of a Vector,
  removing the kMaxEntries * uint32_t bytes of Vector overhead.
- Create the EntryCache and EntryMetadata abstractions. These bring
  together KeyDescriptors and their addresses.
- Move functions for finding and updating KeyDescriptors to the
  EntryCache and EntryMetadata classes.
- Start EntryCache unit tests. The tests will be expanded in future CLs.

Change-Id: I726ec198c0c4188086357ac6df944a7670c30000
diff --git a/pw_kvs/BUILD b/pw_kvs/BUILD
index e08c214..9700013 100644
--- a/pw_kvs/BUILD
+++ b/pw_kvs/BUILD
@@ -28,10 +28,12 @@
         "alignment.cc",
         "checksum.cc",
         "entry.cc",
+        "entry_cache.cc",
         "flash_memory.cc",
         "format.cc",
         "key_value_store.cc",
         "public/pw_kvs/internal/entry.h",
+        "public/pw_kvs/internal/entry_cache.h",
         "public/pw_kvs/internal/hash.h",
         "public/pw_kvs/internal/key_descriptor.h",
         "public/pw_kvs/internal/sector_descriptor.h",
@@ -118,6 +120,12 @@
 )
 
 pw_cc_test(
+    name = "entry_cache_test",
+    srcs = ["entry_cache_test.cc"],
+    deps = [":pw_kvs"],
+)
+
+pw_cc_test(
     name = "key_value_store_test",
     srcs = ["key_value_store_test.cc"],
     deps = [
diff --git a/pw_kvs/BUILD.gn b/pw_kvs/BUILD.gn
index 519ee3b..b682455 100644
--- a/pw_kvs/BUILD.gn
+++ b/pw_kvs/BUILD.gn
@@ -33,10 +33,12 @@
     "alignment.cc",
     "checksum.cc",
     "entry.cc",
+    "entry_cache.cc",
     "flash_memory.cc",
     "format.cc",
     "key_value_store.cc",
     "public/pw_kvs/internal/entry.h",
+    "public/pw_kvs/internal/entry_cache.h",
     "public/pw_kvs/internal/hash.h",
     "public/pw_kvs/internal/key_descriptor.h",
     "public/pw_kvs/internal/sector_descriptor.h",
@@ -55,6 +57,7 @@
   ]
   friend = [
     ":entry_test",
+    ":entry_cache_test",
     ":key_value_store_test",
     ":key_value_store_binary_format_test",
     ":key_value_store_map_test",
@@ -98,6 +101,7 @@
     ":alignment_test",
     ":checksum_test",
     ":entry_test",
+    ":entry_cache_test",
     ":key_value_store_test",
     ":key_value_store_binary_format_test",
     ":key_value_store_fuzz_test",
@@ -128,6 +132,11 @@
   sources = [ "entry_test.cc" ]
 }
 
+pw_test("entry_cache_test") {
+  deps = [ ":pw_kvs" ]
+  sources = [ "entry_cache_test.cc" ]
+}
+
 pw_test("key_value_store_test") {
   deps = [
     ":crc16",
diff --git a/pw_kvs/entry_cache.cc b/pw_kvs/entry_cache.cc
new file mode 100644
index 0000000..eec4044
--- /dev/null
+++ b/pw_kvs/entry_cache.cc
@@ -0,0 +1,202 @@
+// Copyright 2020 The Pigweed Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License"); you may not
+// use this file except in compliance with the License. You may obtain a copy of
+// the License at
+//
+//     https://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+// License for the specific language governing permissions and limitations under
+// the License.
+
+#include "pw_kvs/internal/entry_cache.h"
+
+#include <cinttypes>
+
+#include "pw_kvs/flash_memory.h"
+#include "pw_kvs/internal/entry.h"
+#include "pw_kvs/internal/hash.h"
+#include "pw_kvs_private/macros.h"
+#include "pw_log/log.h"
+
+namespace pw::kvs::internal {
+namespace {
+
+using std::string_view;
+
+constexpr FlashPartition::Address kNoAddress = FlashPartition::Address(-1);
+
+}  // namespace
+
+void EntryMetadata::Reset(const KeyDescriptor& descriptor, Address address) {
+  *descriptor_ = descriptor;
+
+  addresses_[0] = address;
+  for (size_t i = 1; i < addresses_.size(); ++i) {
+    addresses_[i] = kNoAddress;
+  }
+  addresses_ = addresses_.first(1);
+}
+
+Status EntryCache::Find(FlashPartition& partition,
+                        string_view key,
+                        EntryMetadata* metadata) const {
+  const uint32_t hash = internal::Hash(key);
+  Entry::KeyBuffer key_buffer;
+
+  for (size_t i = 0; i < descriptors_.size(); ++i) {
+    if (descriptors_[i].key_hash == hash) {
+      TRY(Entry::ReadKey(
+          partition, *first_address(i), key.size(), key_buffer.data()));
+
+      if (key == string_view(key_buffer.data(), key.size())) {
+        PW_LOG_DEBUG("Found match for key hash 0x%08" PRIx32, hash);
+        *metadata = EntryMetadata(descriptors_[i], addresses(i));
+        return Status::OK;
+      } else {
+        PW_LOG_WARN("Found key hash collision for 0x%08" PRIx32, hash);
+        return Status::ALREADY_EXISTS;
+      }
+    }
+  }
+  return Status::NOT_FOUND;
+}
+
+Status EntryCache::FindExisting(FlashPartition& partition,
+                                string_view key,
+                                EntryMetadata* metadata) const {
+  Status status = Find(partition, key, metadata);
+
+  // If the key's hash collides with an existing key or if the key is deleted,
+  // treat it as if it is not in the KVS.
+  if (status == Status::ALREADY_EXISTS ||
+      (status.ok() && metadata->deleted())) {
+    return Status::NOT_FOUND;
+  }
+  return status;
+}
+
+EntryMetadata EntryCache::AddNew(const KeyDescriptor& descriptor,
+                                 Address entry_address) {
+  // TODO(hepler): DCHECK(!full());
+  Address* first_address = ResetAddresses(descriptors_.size(), entry_address);
+  descriptors_.push_back(descriptor);
+  return EntryMetadata(descriptors_.back(), span(first_address, 1));
+}
+
+// TODO: This method is the trigger of the O(valid_entries * all_entries) time
+// complexity for reading. At some cost to memory, this could be optimized by
+// using a hash table instead of scanning, but in practice this should be fine
+// for a small number of keys
+Status EntryCache::AddNewOrUpdateExisting(const KeyDescriptor& descriptor,
+                                          Address address,
+                                          size_t sector_size_bytes) {
+  // With the new key descriptor, either add it to the descriptor table or
+  // overwrite an existing entry with an older version of the key.
+  const int index = FindIndex(descriptor.key_hash);
+
+  // Write a new entry if there is room.
+  if (index == -1) {
+    if (full()) {
+      return Status::RESOURCE_EXHAUSTED;
+    }
+    AddNew(descriptor, address);
+    return Status::OK;
+  }
+
+  // Existing entry is old; replace the existing entry with the new one.
+  if (descriptor.transaction_id > descriptors_[index].transaction_id) {
+    descriptors_[index] = descriptor;
+    ResetAddresses(index, address);
+    return Status::OK;
+  }
+
+  // If the entries have a duplicate transaction ID, add the new (redundant)
+  // entry to the existing descriptor.
+  if (descriptors_[index].transaction_id == descriptor.transaction_id) {
+    if (descriptors_[index].key_hash != descriptor.key_hash) {
+      PW_LOG_ERROR("Duplicate entry for key 0x%08" PRIx32
+                   " with transaction ID %" PRIu32 " has non-matching hash",
+                   descriptor.key_hash,
+                   descriptor.transaction_id);
+      return Status::DATA_LOSS;
+    }
+
+    // Verify that this entry is not in the same sector as an existing copy of
+    // this same key.
+    for (Address existing_address : addresses(index)) {
+      if (existing_address / sector_size_bytes == address / sector_size_bytes) {
+        PW_LOG_DEBUG("Multiple Redundant entries in same sector %zu",
+                     address / sector_size_bytes);
+        return Status::DATA_LOSS;
+      }
+    }
+
+    AddAddressIfRoom(index, address);
+  } else {
+    PW_LOG_DEBUG("Found stale entry when appending; ignoring");
+  }
+  return Status::OK;
+}
+
+size_t EntryCache::present_entries() const {
+  size_t present_entries = 0;
+
+  for (const KeyDescriptor& descriptor : descriptors_) {
+    if (descriptor.state != EntryState::kDeleted) {
+      present_entries += 1;
+    }
+  }
+
+  return present_entries;
+}
+
+int EntryCache::FindIndex(uint32_t key_hash) const {
+  for (size_t i = 0; i < descriptors_.size(); ++i) {
+    if (descriptors_[i].key_hash == key_hash) {
+      return i;
+    }
+  }
+  return -1;
+}
+
+void EntryCache::AddAddressIfRoom(size_t descriptor_index, Address address) {
+  Address* const existing = first_address(descriptor_index);
+
+  for (size_t i = 0; i < redundancy(); ++i) {
+    if (existing[i] == kNoAddress) {
+      existing[i] = address;
+      addresses(descriptor_index);
+      return;
+    }
+  }
+}
+
+span<EntryCache::Address> EntryCache::addresses(size_t descriptor_index) const {
+  Address* const addresses = first_address(descriptor_index);
+
+  size_t size = 0;
+  while (size < redundancy() && addresses[size] != kNoAddress) {
+    size += 1;
+  }
+
+  return span(addresses, size);
+}
+
+EntryCache::Address* EntryCache::ResetAddresses(size_t descriptor_index,
+                                                Address address) {
+  Address* first = first_address(descriptor_index);
+  *first = address;
+
+  // Clear the additional addresses, if any.
+  for (size_t i = 1; i < redundancy_; ++i) {
+    first[i] = kNoAddress;
+  }
+
+  return first;
+}
+
+}  // namespace pw::kvs::internal
diff --git a/pw_kvs/entry_cache_test.cc b/pw_kvs/entry_cache_test.cc
new file mode 100644
index 0000000..df1ae90
--- /dev/null
+++ b/pw_kvs/entry_cache_test.cc
@@ -0,0 +1,45 @@
+// Copyright 2020 The Pigweed Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License"); you may not
+// use this file except in compliance with the License. You may obtain a copy of
+// the License at
+//
+//     https://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+// License for the specific language governing permissions and limitations under
+// the License.
+
+#include "pw_kvs/internal/entry_cache.h"
+
+#include "gtest/gtest.h"
+
+namespace pw::kvs::internal {
+
+class EmptyEntryCache : public ::testing::Test {
+ protected:
+  static constexpr size_t kRedundancy = 3;
+
+  EmptyEntryCache() : entries_(descriptors_, addresses_, kRedundancy) {}
+
+  Vector<KeyDescriptor, 32> descriptors_;
+  EntryCache::AddressList<32, kRedundancy> addresses_;
+
+  EntryCache entries_;
+};
+
+TEST_F(EmptyEntryCache, AddNew) {
+  EntryMetadata metadata = entries_.AddNew(
+      {.key_hash = 1, .transaction_id = 123, .state = EntryState::kValid}, 5);
+  EXPECT_EQ(1u, metadata.hash());
+  EXPECT_EQ(123u, metadata.transaction_id());
+
+  EXPECT_EQ(5u, metadata.first_address());
+  EXPECT_EQ(1u, metadata.addresses().size());
+}
+
+// TODO(hepler): Expand these tests!
+
+}  // namespace pw::kvs::internal
diff --git a/pw_kvs/key_value_store.cc b/pw_kvs/key_value_store.cc
index d76323e..2ad68fa 100644
--- a/pw_kvs/key_value_store.cc
+++ b/pw_kvs/key_value_store.cc
@@ -19,10 +19,7 @@
 #include <cstring>
 #include <type_traits>
 
-#include "pw_kvs/format.h"
-
 #define PW_LOG_USE_ULTRA_SHORT_NAMES 1
-#include "pw_kvs/internal/entry.h"
 #include "pw_kvs_private/macros.h"
 #include "pw_log/log.h"
 
@@ -47,16 +44,18 @@
 }  // namespace
 
 KeyValueStore::KeyValueStore(FlashPartition* partition,
-                             Vector<KeyDescriptor>& key_descriptor_list,
-                             Vector<SectorDescriptor>& sector_descriptor_list,
-                             size_t redundancy,
                              span<const EntryFormat> formats,
-                             const Options& options)
+                             const Options& options,
+                             size_t redundancy,
+                             Vector<SectorDescriptor>& sector_descriptor_list,
+                             const SectorDescriptor** temp_sectors_to_skip,
+                             Vector<KeyDescriptor>& key_descriptor_list,
+                             Address* addresses)
     : partition_(*partition),
       formats_(formats),
-      key_descriptors_(key_descriptor_list),
+      entry_cache_(key_descriptor_list, addresses, redundancy),
       sectors_(sector_descriptor_list),
-      redundancy_(redundancy),
+      temp_sectors_to_skip_(temp_sectors_to_skip),
       options_(options) {
   initialized_ = false;
   last_new_sector_ = nullptr;
@@ -67,7 +66,7 @@
   initialized_ = false;
   last_new_sector_ = nullptr;
   last_transaction_id_ = 0;
-  key_descriptors_.clear();
+  entry_cache_.Reset();
 
   INF("Initializing key value store");
   if (partition_.sector_count() > sectors_.max_size()) {
@@ -172,26 +171,23 @@
   }
 
   DBG("Second pass: Count valid bytes in each sector");
-  const KeyDescriptor* newest_key = nullptr;
+  Address newest_key = 0;
 
   // For every valid key, increment the valid bytes for that sector.
-  for (KeyDescriptor& key_descriptor : key_descriptors_) {
-    for (auto& address : key_descriptor.addresses()) {
+
+  for (const EntryMetadata& metadata : entry_cache_) {
+    for (Address address : metadata.addresses()) {
       Entry entry;
       TRY(Entry::Read(partition_, address, formats_, &entry));
       SectorFromAddress(address)->AddValidBytes(entry.size());
     }
-    if (key_descriptor.IsNewerThan(last_transaction_id_)) {
-      last_transaction_id_ = key_descriptor.transaction_id();
-      newest_key = &key_descriptor;
+    if (metadata.IsNewerThan(last_transaction_id_)) {
+      last_transaction_id_ = metadata.transaction_id();
+      newest_key = metadata.addresses().back();
     }
   }
 
-  if (newest_key == nullptr) {
-    last_new_sector_ = sectors_.begin();
-  } else {
-    last_new_sector_ = SectorFromAddress(newest_key->addresses().back());
-  }
+  last_new_sector_ = SectorFromAddress(newest_key);
 
   if (!empty_sector_found) {
     // TODO: Record/report the error condition and recovery result.
@@ -208,7 +204,7 @@
   INF("KeyValueStore init complete: active keys %zu, deleted keys %zu, sectors "
       "%zu, logical sector size %zu bytes",
       size(),
-      (key_descriptors_.size() - size()),
+      (entry_cache_.total_entries() - size()),
       sectors_.size(),
       partition_.sector_size_bytes());
 
@@ -261,11 +257,10 @@
   TRY(entry.VerifyChecksumInFlash());
 
   // A valid entry was found, so update the next entry address before doing any
-  // of the checks that happen in AppendNewOrOverwriteStaleExistingDescriptor().
+  // of the checks that happen in AddNewOrUpdateExisting.
   *next_entry_address = entry.next_address();
-  TRY(AppendNewOrOverwriteStaleExistingDescriptor(entry.descriptor(key)));
-
-  return Status::OK;
+  return entry_cache_.AddNewOrUpdateExisting(
+      entry.descriptor(key), entry.address(), partition_.sector_size_bytes());
 }
 
 // Scans flash memory within a sector to find a KVS entry magic.
@@ -295,73 +290,15 @@
   return Status::NOT_FOUND;
 }
 
-// TODO: This method is the trigger of the O(valid_entries * all_entries) time
-// complexity for reading. At some cost to memory, this could be optimized by
-// using a hash table instead of scanning, but in practice this should be fine
-// for a small number of keys
-Status KeyValueStore::AppendNewOrOverwriteStaleExistingDescriptor(
-    const KeyDescriptor& key_descriptor) {
-  // With the new key descriptor, either add it to the descriptor table or
-  // overwrite an existing entry with an older version of the key.
-  KeyDescriptor* existing_descriptor = FindDescriptor(key_descriptor.hash());
-
-  // Write a new entry.
-  if (existing_descriptor == nullptr) {
-    if (key_descriptors_.full()) {
-      return Status::RESOURCE_EXHAUSTED;
-    }
-    key_descriptors_.push_back(key_descriptor);
-  } else if (key_descriptor.IsNewerThan(
-                 existing_descriptor->transaction_id())) {
-    // Existing entry is old; replace the existing entry with the new one.
-    *existing_descriptor = key_descriptor;
-  } else if (existing_descriptor->transaction_id() ==
-             key_descriptor.transaction_id()) {
-    // If the entries have a duplicate transaction ID, add the new (redundant)
-    // entry to the existing descriptor.
-    if (existing_descriptor->hash() != key_descriptor.hash()) {
-      ERR("Duplicate entry for key 0x%08" PRIx32 " with transaction ID %" PRIu32
-          " has non-matching hash",
-          key_descriptor.hash(),
-          key_descriptor.transaction_id());
-      return Status::DATA_LOSS;
-    }
-
-    // Verify that this entry is not in the same sector as an existing copy of
-    // this same key.
-    for (auto address : existing_descriptor->addresses()) {
-      if (SectorFromAddress(address) ==
-          SectorFromAddress(key_descriptor.address())) {
-        DBG("Multiple Redundant entries in same sector %u",
-            SectorIndex(SectorFromAddress(address)));
-        return Status::DATA_LOSS;
-      }
-    }
-    existing_descriptor->addresses().push_back(key_descriptor.address());
-  } else {
-    DBG("Found stale entry when appending; ignoring");
-  }
-  return Status::OK;
-}
-
-KeyValueStore::KeyDescriptor* KeyValueStore::FindDescriptor(uint32_t hash) {
-  for (KeyDescriptor& key_descriptor : key_descriptors_) {
-    if (key_descriptor.hash() == hash) {
-      return &key_descriptor;
-    }
-  }
-  return nullptr;
-}
-
 StatusWithSize KeyValueStore::Get(string_view key,
                                   span<byte> value_buffer,
                                   size_t offset_bytes) const {
   TRY_WITH_SIZE(CheckOperation(key));
 
-  const KeyDescriptor* key_descriptor;
-  TRY_WITH_SIZE(FindExistingKeyDescriptor(key, &key_descriptor));
+  EntryMetadata metadata;
+  TRY_WITH_SIZE(entry_cache_.FindExisting(partition_, key, &metadata));
 
-  return Get(key, *key_descriptor, value_buffer, offset_bytes);
+  return Get(key, metadata, value_buffer, offset_bytes);
 }
 
 Status KeyValueStore::PutBytes(string_view key, span<const byte> value) {
@@ -378,17 +315,16 @@
     return Status::INVALID_ARGUMENT;
   }
 
-  KeyDescriptor* key_descriptor;
-  Status status = FindKeyDescriptor(key, &key_descriptor);
+  EntryMetadata metadata;
+  Status status = entry_cache_.Find(partition_, key, &metadata);
 
   if (status.ok()) {
     // TODO: figure out logging how to support multiple addresses.
-    DBG("Overwriting entry for key 0x%08" PRIx32 " in %u sectors including %u",
-        key_descriptor->hash(),
-        unsigned(key_descriptor->addresses().size()),
-        SectorIndex(SectorFromAddress(key_descriptor->address())));
-    return WriteEntryForExistingKey(
-        key_descriptor, KeyDescriptor::kValid, key, value);
+    DBG("Overwriting entry for key 0x%08" PRIx32 " in %zu sectors including %u",
+        metadata.hash(),
+        metadata.addresses().size(),
+        SectorIndex(SectorFromAddress(metadata.first_address())));
+    return WriteEntryForExistingKey(metadata, EntryState::kValid, key, value);
   }
 
   if (status == Status::NOT_FOUND) {
@@ -401,16 +337,15 @@
 Status KeyValueStore::Delete(string_view key) {
   TRY(CheckOperation(key));
 
-  KeyDescriptor* key_descriptor;
-  TRY(FindExistingKeyDescriptor(key, &key_descriptor));
+  EntryMetadata metadata;
+  TRY(entry_cache_.FindExisting(partition_, key, &metadata));
 
   // TODO: figure out logging how to support multiple addresses.
-  DBG("Writing tombstone for key 0x%08" PRIx32 " in %u sectors including %u",
-      key_descriptor->hash(),
-      unsigned(key_descriptor->addresses().size()),
-      SectorIndex(SectorFromAddress(key_descriptor->address())));
-  return WriteEntryForExistingKey(
-      key_descriptor, KeyDescriptor::kDeleted, key, {});
+  DBG("Writing tombstone for key 0x%08" PRIx32 " in %zu sectors including %u",
+      metadata.hash(),
+      metadata.addresses().size(),
+      SectorIndex(SectorFromAddress(metadata.first_address())));
+  return WriteEntryForExistingKey(metadata, EntryState::kDeleted, key, {});
 }
 
 void KeyValueStore::Item::ReadKey() {
@@ -420,7 +355,7 @@
   // TODO: add support for using one of the redundant entries if reading the
   // first copy fails.
   if (Entry::Read(
-          kvs_.partition_, descriptor_->address(), kvs_.formats_, &entry)
+          kvs_.partition_, iterator_->first_address(), kvs_.formats_, &entry)
           .ok()) {
     entry.ReadKey(key_buffer_);
   }
@@ -428,53 +363,39 @@
 
 KeyValueStore::iterator& KeyValueStore::iterator::operator++() {
   // Skip to the next entry that is valid (not deleted).
-  while (++item_.descriptor_ != item_.kvs_.key_descriptors_.end() &&
-         item_.descriptor_->deleted()) {
+  while (++item_.iterator_ != item_.kvs_.entry_cache_.end() &&
+         item_.iterator_->deleted()) {
   }
   return *this;
 }
 
 KeyValueStore::iterator KeyValueStore::begin() const {
-  const KeyDescriptor* descriptor = key_descriptors_.begin();
+  internal::EntryCache::iterator cache_iterator = entry_cache_.begin();
   // Skip over any deleted entries at the start of the descriptor list.
-  while (descriptor != key_descriptors_.end() && descriptor->deleted()) {
-    ++descriptor;
+  while (cache_iterator != entry_cache_.end() && cache_iterator->deleted()) {
+    ++cache_iterator;
   }
-  return iterator(*this, descriptor);
-}
-
-// TODO(hepler): The valid entry count could be tracked in the KVS to avoid the
-// need for this for-loop.
-size_t KeyValueStore::size() const {
-  size_t valid_entries = 0;
-
-  for (const KeyDescriptor& key_descriptor : key_descriptors_) {
-    if (!key_descriptor.deleted()) {
-      valid_entries += 1;
-    }
-  }
-
-  return valid_entries;
+  return iterator(*this, cache_iterator);
 }
 
 StatusWithSize KeyValueStore::ValueSize(string_view key) const {
   TRY_WITH_SIZE(CheckOperation(key));
 
-  const KeyDescriptor* key_descriptor;
-  TRY_WITH_SIZE(FindExistingKeyDescriptor(key, &key_descriptor));
+  EntryMetadata metadata;
+  TRY_WITH_SIZE(entry_cache_.FindExisting(partition_, key, &metadata));
 
-  return ValueSize(*key_descriptor);
+  return ValueSize(metadata);
 }
 
 StatusWithSize KeyValueStore::Get(string_view key,
-                                  const KeyDescriptor& descriptor,
+                                  const EntryMetadata& metadata,
                                   span<std::byte> value_buffer,
                                   size_t offset_bytes) const {
   Entry entry;
   // TODO: add support for using one of the redundant entries if reading the
   // first copy fails.
   TRY_WITH_SIZE(
-      Entry::Read(partition_, descriptor.address(), formats_, &entry));
+      Entry::Read(partition_, metadata.first_address(), formats_, &entry));
 
   StatusWithSize result = entry.ReadValue(value_buffer, offset_bytes);
   if (result.ok() && options_.verify_on_read && offset_bytes == 0u) {
@@ -495,19 +416,19 @@
                                    size_t size_bytes) const {
   TRY(CheckOperation(key));
 
-  const KeyDescriptor* descriptor;
-  TRY(FindExistingKeyDescriptor(key, &descriptor));
+  EntryMetadata metadata;
+  TRY(entry_cache_.FindExisting(partition_, key, &metadata));
 
-  return FixedSizeGet(key, *descriptor, value, size_bytes);
+  return FixedSizeGet(key, metadata, value, size_bytes);
 }
 
 Status KeyValueStore::FixedSizeGet(std::string_view key,
-                                   const KeyDescriptor& descriptor,
+                                   const EntryMetadata& metadata,
                                    void* value,
                                    size_t size_bytes) const {
   // Ensure that the size of the stored value matches the size of the type.
   // Otherwise, report error. This check avoids potential memory corruption.
-  TRY_ASSIGN(const size_t actual_size, ValueSize(descriptor));
+  TRY_ASSIGN(const size_t actual_size, ValueSize(metadata));
 
   if (actual_size != size_bytes) {
     DBG("Requested %zu B read, but value is %zu B", size_bytes, actual_size);
@@ -515,17 +436,17 @@
   }
 
   StatusWithSize result =
-      Get(key, descriptor, span(static_cast<byte*>(value), size_bytes), 0);
+      Get(key, metadata, span(static_cast<byte*>(value), size_bytes), 0);
 
   return result.status();
 }
 
-StatusWithSize KeyValueStore::ValueSize(const KeyDescriptor& descriptor) const {
+StatusWithSize KeyValueStore::ValueSize(const EntryMetadata& metadata) const {
   Entry entry;
   // TODO: add support for using one of the redundant entries if reading the
   // first copy fails.
   TRY_WITH_SIZE(
-      Entry::Read(partition_, descriptor.address(), formats_, &entry));
+      Entry::Read(partition_, metadata.first_address(), formats_, &entry));
 
   return StatusWithSize(entry.value_size());
 }
@@ -540,152 +461,97 @@
   return Status::OK;
 }
 
-// Searches for a KeyDescriptor that matches this key and sets *result to point
-// to it if one is found.
-//
-//             OK: there is a matching descriptor and *result is set
-//      NOT_FOUND: there is no descriptor that matches this key, but this key
-//                 has a unique hash (and could potentially be added to the KVS)
-// ALREADY_EXISTS: there is no descriptor that matches this key, but the
-//                 key's hash collides with the hash for an existing descriptor
-//
-Status KeyValueStore::FindKeyDescriptor(string_view key,
-                                        const KeyDescriptor** result) const {
-  const uint32_t hash = internal::Hash(key);
-  Entry::KeyBuffer key_buffer;
-
-  for (auto& descriptor : key_descriptors_) {
-    if (descriptor.hash() == hash) {
-      // TODO: add support for using one of the redundant entries if reading the
-      // first copy fails.
-      TRY(Entry::ReadKey(
-          partition_, descriptor.address(), key.size(), key_buffer.data()));
-
-      if (key == string_view(key_buffer.data(), key.size())) {
-        DBG("Found match for key hash 0x%08" PRIx32, hash);
-        *result = &descriptor;
-        return Status::OK;
-      } else {
-        WRN("Found key hash collision for 0x%08" PRIx32, hash);
-        return Status::ALREADY_EXISTS;
-      }
-    }
-  }
-  return Status::NOT_FOUND;
-}
-
-// Searches for a KeyDescriptor that matches this key and sets *result to point
-// to it if one is found.
-//
-//          OK: there is a matching descriptor and *result is set
-//   NOT_FOUND: there is no descriptor that matches this key
-//
-Status KeyValueStore::FindExistingKeyDescriptor(
-    string_view key, const KeyDescriptor** result) const {
-  Status status = FindKeyDescriptor(key, result);
-
-  // If the key's hash collides with an existing key or if the key is deleted,
-  // treat it as if it is not in the KVS.
-  if (status == Status::ALREADY_EXISTS ||
-      (status.ok() && (*result)->deleted())) {
-    return Status::NOT_FOUND;
-  }
-  return status;
-}
-
-Status KeyValueStore::WriteEntryForExistingKey(KeyDescriptor* key_descriptor,
-                                               KeyDescriptor::State new_state,
+Status KeyValueStore::WriteEntryForExistingKey(EntryMetadata& metadata,
+                                               EntryState new_state,
                                                string_view key,
                                                span<const byte> value) {
   // Read the original entry to get the size for sector accounting purposes.
   Entry entry;
   // TODO: add support for using one of the redundant entries if reading the
   // first copy fails.
-  TRY(Entry::Read(partition_, key_descriptor->address(), formats_, &entry));
+  TRY(Entry::Read(partition_, metadata.first_address(), formats_, &entry));
 
-  return WriteEntry(key, value, new_state, key_descriptor, entry.size());
+  return WriteEntry(key, value, new_state, &metadata, entry.size());
 }
 
 Status KeyValueStore::WriteEntryForNewKey(string_view key,
                                           span<const byte> value) {
-  if (key_descriptors_.full()) {
+  if (entry_cache_.full()) {
     WRN("KVS full: trying to store a new entry, but can't. Have %zu entries",
-        key_descriptors_.size());
+        entry_cache_.total_entries());
     return Status::RESOURCE_EXHAUSTED;
   }
 
-  return WriteEntry(key, value, KeyDescriptor::kValid);
+  return WriteEntry(key, value, EntryState::kValid);
 }
 
 Status KeyValueStore::WriteEntry(string_view key,
                                  span<const byte> value,
-                                 KeyDescriptor::State new_state,
-                                 KeyDescriptor* prior_descriptor,
+                                 EntryState new_state,
+                                 EntryMetadata* prior_metadata,
                                  size_t prior_size) {
   const size_t entry_size = Entry::size(partition_, key, value);
 
   // List of addresses for sectors with space for this entry.
-  // TODO: The derived class should allocate space for this address list.
-  Vector<Address, internal::kEntryRedundancy> addresses;
+  Address* reserved_addresses = entry_cache_.TempReservedAddressesForWrite();
 
   // Find sectors to write the entry to. This may involve garbage collecting one
   // or more sectors.
-  for (size_t i = 0; i < redundancy_; i++) {
+  for (size_t i = 0; i < redundancy(); i++) {
     SectorDescriptor* sector;
-    TRY(GetSectorForWrite(&sector, entry_size, addresses));
+    TRY(GetSectorForWrite(&sector, entry_size, span(reserved_addresses, i)));
 
     DBG("Found space for entry in sector %u", SectorIndex(sector));
-    addresses.push_back(NextWritableAddress(sector));
+    reserved_addresses[i] = NextWritableAddress(sector);
   }
 
   // Write the entry at the first address that was found.
-  Entry entry = CreateEntry(addresses.front(), key, value, new_state);
+  Entry entry = CreateEntry(reserved_addresses[0], key, value, new_state);
   TRY(AppendEntry(entry, key, value));
 
   // After writing the first entry successfully, update the key descriptors.
   // Once a single new the entry is written, the old entries are invalidated.
-  KeyDescriptor& new_descriptor =
-      UpdateKeyDescriptor(entry, key, prior_descriptor, prior_size);
+  EntryMetadata new_metadata =
+      UpdateKeyDescriptor(entry, key, prior_metadata, prior_size);
 
   // Write the additional copies of the entry, if redundancy is greater than 1.
-  for (size_t i = 1; i < addresses.size(); ++i) {
-    entry.set_address(addresses[i]);
+  for (size_t i = 1; i < redundancy(); ++i) {
+    entry.set_address(reserved_addresses[i]);
     TRY(AppendEntry(entry, key, value));
-    new_descriptor.addresses().push_back(addresses[i]);
+    new_metadata.AddNewAddress(reserved_addresses[i]);
   }
   return Status::OK;
 }
 
-internal::KeyDescriptor& KeyValueStore::UpdateKeyDescriptor(
+KeyValueStore::EntryMetadata KeyValueStore::UpdateKeyDescriptor(
     const Entry& entry,
     string_view key,
-    KeyDescriptor* prior_descriptor,
+    EntryMetadata* prior_metadata,
     size_t prior_size) {
   // If there is no prior descriptor, create a new one.
-  if (prior_descriptor == nullptr) {
-    key_descriptors_.push_back(entry.descriptor(key));
-    return key_descriptors_.back();
+  if (prior_metadata == nullptr) {
+    return entry_cache_.AddNew(entry.descriptor(key), entry.address());
   }
 
   // Remove valid bytes for the old entry and its copies, which are now stale.
-  for (Address address : prior_descriptor->addresses()) {
+  for (Address address : prior_metadata->addresses()) {
     SectorFromAddress(address)->RemoveValidBytes(prior_size);
   }
 
-  *prior_descriptor = entry.descriptor(prior_descriptor->hash());
-  return *prior_descriptor;
+  prior_metadata->Reset(entry.descriptor(key), entry.address());
+  return *prior_metadata;
 }
 
-// Find a sector to use for writing a new entry to. Do automatic garbage
+// Finds a sector to use for writing a new entry to. Does automatic garbage
 // collection if needed and allowed.
 //
 //                 OK: Sector found with needed space.
 // RESOURCE_EXHAUSTED: No sector available with the needed space.
 Status KeyValueStore::GetSectorForWrite(SectorDescriptor** sector,
                                         size_t entry_size,
-                                        span<const Address> addresses_to_skip) {
+                                        span<const Address> reserved) {
   Status result =
-      FindSectorWithSpace(sector, entry_size, kAppendEntry, addresses_to_skip);
+      FindSectorWithSpace(sector, entry_size, kAppendEntry, {}, reserved);
 
   size_t gc_sector_count = 0;
   bool do_auto_gc = options_.gc_on_write != GargbageCollectOnWrite::kDisabled;
@@ -698,7 +564,7 @@
       do_auto_gc = false;
     }
     // Garbage collect and then try again to find the best sector.
-    Status gc_status = GarbageCollectPartial(addresses_to_skip);
+    Status gc_status = GarbageCollectPartial(reserved);
     if (!gc_status.ok()) {
       if (gc_status == Status::NOT_FOUND) {
         // Not enough space, and no reclaimable bytes, this KVS is full!
@@ -707,8 +573,8 @@
       return gc_status;
     }
 
-    result = FindSectorWithSpace(
-        sector, entry_size, kAppendEntry, addresses_to_skip);
+    result =
+        FindSectorWithSpace(sector, entry_size, kAppendEntry, {}, reserved);
 
     gc_sector_count++;
     // Allow total sectors + 2 number of GC cycles so that once reclaimable
@@ -753,9 +619,9 @@
   return Status::OK;
 }
 
-Status KeyValueStore::RelocateEntry(KeyDescriptor& key_descriptor,
+Status KeyValueStore::RelocateEntry(const EntryMetadata& metadata,
                                     KeyValueStore::Address& address,
-                                    span<const Address> addresses_to_skip) {
+                                    span<const Address> reserved_addresses) {
   Entry entry;
   TRY(Entry::Read(partition_, address, formats_, &entry));
 
@@ -767,34 +633,18 @@
   // an immediate extra relocation).
   SectorDescriptor* new_sector;
 
-  // Avoid both this entry's sectors and sectors in addresses_to_skip.
-  //
-  // This is overly strict. addresses_to_skip is populated when there are
-  // sectors reserved for a new entry. It is safe to garbage collect into these
-  // sectors, as long as there remains room for the pending entry. These
-  // reserved sectors could also be garbage collected if they have recoverable
-  // space.
-  // TODO(hepler): Look into improving garbage collection.
-  //
-  // TODO: The derived class should allocate space for this address list.
-  Vector<Address, internal::kEntryRedundancy* 2> all_addresses_to_skip =
-      key_descriptor.addresses();
-  for (Address address : addresses_to_skip) {
-    if (!Contains(all_addresses_to_skip, address)) {
-      // TODO: DCHECK(!all_addresses_to_skip.full)
-      all_addresses_to_skip.push_back(address);
-    }
-  }
-
-  TRY(FindSectorWithSpace(
-      &new_sector, entry.size(), kGarbageCollect, all_addresses_to_skip));
+  TRY(FindSectorWithSpace(&new_sector,
+                          entry.size(),
+                          kGarbageCollect,
+                          metadata.addresses(),
+                          reserved_addresses));
 
   const Address new_address = NextWritableAddress(new_sector);
   const StatusWithSize result = entry.Copy(new_address);
   new_sector->RemoveWritableBytes(result.size());
   TRY(result);
 
-  // Entry was written successfully; update the key descriptor and the sector
+  // Entry was written successfully; update descriptor's address and the sector
   // descriptors to reflect the new entry.
   SectorFromAddress(address)->RemoveValidBytes(result.size());
   new_sector->AddValidBytes(result.size());
@@ -811,7 +661,8 @@
     SectorDescriptor** found_sector,
     size_t size,
     FindSectorMode find_mode,
-    span<const Address> addresses_to_skip) {
+    span<const Address> addresses_to_skip,
+    span<const Address> reserved_addresses) {
   SectorDescriptor* first_empty_sector = nullptr;
   bool at_least_two_empty_sectors = (find_mode == kGarbageCollect);
 
@@ -819,19 +670,30 @@
   SectorDescriptor* non_empty_least_reclaimable_sector = nullptr;
   const size_t sector_size_bytes = partition_.sector_size_bytes();
 
-  // Build a vector of sectors to avoid.
-  // TODO(hepler): Consolidate the different address / sector lists.
-  Vector<SectorDescriptor*, internal::kEntryRedundancy * 2> sectors_to_skip;
+  // Build a list of sectors to avoid.
+  //
+  // This is overly strict. reserved_addresses is populated when there are
+  // sectors reserved for a new entry. It is safe to garbage collect into
+  // these sectors, as long as there remains room for the pending entry. These
+  // reserved sectors could also be garbage collected if they have recoverable
+  // space. For simplicitly, avoid both the relocating key's redundant entries
+  // (addresses_to_skip) and the sectors reserved for pending writes
+  // (reserved_addresses).
+  // TODO(hepler): Look into improving garbage collection.
+  size_t sectors_to_skip = 0;
   for (Address address : addresses_to_skip) {
-    sectors_to_skip.push_back(SectorFromAddress(address));
+    temp_sectors_to_skip_[sectors_to_skip++] = SectorFromAddress(address);
+  }
+  for (Address address : reserved_addresses) {
+    temp_sectors_to_skip_[sectors_to_skip++] = SectorFromAddress(address);
   }
 
   DBG("Find sector with %zu bytes available, starting with sector %u, %s",
       size,
       SectorIndex(last_new_sector_),
       (find_mode == kAppendEntry) ? "Append" : "GC");
-  for (auto& skip_sector : sectors_to_skip) {
-    DBG("  Skip sector %u", SectorIndex(skip_sector));
+  for (size_t i = 0; i < sectors_to_skip; ++i) {
+    DBG("  Skip sector %u", SectorIndex(temp_sectors_to_skip_[i]));
   }
 
   // The last_new_sector_ is the sector that was last selected as the "new empty
@@ -865,7 +727,7 @@
     }
 
     // Skip sectors in the skip list.
-    if (Contains(sectors_to_skip, sector)) {
+    if (Contains(span(temp_sectors_to_skip_, sectors_to_skip), sector)) {
       continue;
     }
 
@@ -925,18 +787,18 @@
 // abstraction for the sector list. Look in to being able to unit test this as
 // its own thing
 KeyValueStore::SectorDescriptor* KeyValueStore::FindSectorToGarbageCollect(
-    span<const Address> addresses_to_avoid) {
+    span<const Address> reserved_addresses) {
   const size_t sector_size_bytes = partition_.sector_size_bytes();
   SectorDescriptor* sector_candidate = nullptr;
   size_t candidate_bytes = 0;
 
   // Build a vector of sectors to avoid.
-  // TODO(hepler): Consolidate the three address / sector lists.
-  Vector<const SectorDescriptor*, internal::kEntryRedundancy> sectors_to_skip;
-  for (auto& address : addresses_to_avoid) {
-    sectors_to_skip.push_back(SectorFromAddress(address));
-    DBG("    Skip sector %u", SectorIndex(SectorFromAddress(address)));
+  for (size_t i = 0; i < reserved_addresses.size(); ++i) {
+    temp_sectors_to_skip_[i] = SectorFromAddress(reserved_addresses[i]);
+    DBG("    Skip sector %u",
+        SectorIndex(SectorFromAddress(reserved_addresses[i])));
   }
+  const span sectors_to_skip(temp_sectors_to_skip_, reserved_addresses.size());
 
   // Step 1: Try to find a sectors with stale keys and no valid keys (no
   // relocation needed). If any such sectors are found, use the sector with the
@@ -1009,15 +871,15 @@
 }
 
 Status KeyValueStore::GarbageCollectPartial(
-    span<const Address> addresses_to_skip) {
+    span<const Address> reserved_addresses) {
   DBG("Garbage Collect a single sector");
-  for (auto address : addresses_to_skip) {
+  for (Address address : reserved_addresses) {
     DBG("   Avoid address %u", unsigned(address));
   }
 
   // Step 1: Find the sector to garbage collect
   SectorDescriptor* sector_to_gc =
-      FindSectorToGarbageCollect(addresses_to_skip);
+      FindSectorToGarbageCollect(reserved_addresses);
 
   if (sector_to_gc == nullptr) {
     // Nothing to GC.
@@ -1025,19 +887,19 @@
   }
 
   // Step 2: Garbage collect the selected sector.
-  return GarbageCollectSector(sector_to_gc, addresses_to_skip);
+  return GarbageCollectSector(sector_to_gc, reserved_addresses);
 }
 
 Status KeyValueStore::RelocateKeyAddressesInSector(
-    internal::SectorDescriptor& sector_to_gc,
-    internal::KeyDescriptor& descriptor,
-    span<const Address> addresses_to_skip) {
-  for (FlashPartition::Address& address : descriptor.addresses()) {
+    SectorDescriptor& sector_to_gc,
+    const EntryMetadata& metadata,
+    span<const Address> reserved_addresses) {
+  for (FlashPartition::Address& address : metadata.addresses()) {
     if (AddressInSector(sector_to_gc, address)) {
-      DBG("  Relocate entry for Key 0x%08zx, sector %u",
-          size_t(descriptor.hash()),
+      DBG("  Relocate entry for Key 0x%08" PRIx32 ", sector %u",
+          metadata.hash(),
           SectorIndex(SectorFromAddress(address)));
-      TRY(RelocateEntry(descriptor, address, addresses_to_skip));
+      TRY(RelocateEntry(metadata, address, reserved_addresses));
     }
   }
 
@@ -1045,12 +907,12 @@
 };
 
 Status KeyValueStore::GarbageCollectSector(
-    SectorDescriptor* sector_to_gc, span<const Address> addresses_to_skip) {
+    SectorDescriptor* sector_to_gc, span<const Address> reserved_addresses) {
   // Step 1: Move any valid entries in the GC sector to other sectors
   if (sector_to_gc->valid_bytes() != 0) {
-    for (KeyDescriptor& descriptor : key_descriptors_) {
+    for (const EntryMetadata& metadata : entry_cache_) {
       TRY(RelocateKeyAddressesInSector(
-          *sector_to_gc, descriptor, addresses_to_skip));
+          *sector_to_gc, metadata, reserved_addresses));
     }
   }
 
@@ -1073,7 +935,7 @@
 KeyValueStore::Entry KeyValueStore::CreateEntry(Address address,
                                                 string_view key,
                                                 span<const byte> value,
-                                                KeyDescriptor::State state) {
+                                                EntryState state) {
   // Always bump the transaction ID when creating a new entry.
   //
   // Burning transaction IDs prevents inconsistencies between flash and memory
@@ -1088,7 +950,7 @@
   // By always burning transaction IDs, the above problem can't happen.
   last_transaction_id_ += 1;
 
-  if (state == KeyDescriptor::kDeleted) {
+  if (state == EntryState::kDeleted) {
     return Entry::Tombstone(
         partition_, address, formats_.primary(), key, last_transaction_id_);
   }
@@ -1100,7 +962,7 @@
                       last_transaction_id_);
 }
 
-void KeyValueStore::LogDebugInfo() {
+void KeyValueStore::LogDebugInfo() const {
   const size_t sector_size_bytes = partition_.sector_size_bytes();
   DBG("====================== KEY VALUE STORE DUMP =========================");
   DBG(" ");
@@ -1113,18 +975,18 @@
   DBG("  Alignment        = %zu", partition_.alignment_bytes());
   DBG(" ");
   DBG("Key descriptors:");
-  DBG("  Entry count     = %zu", key_descriptors_.size());
-  DBG("  Max entry count = %zu", key_descriptors_.max_size());
+  DBG("  Entry count     = %zu", entry_cache_.total_entries());
+  DBG("  Max entry count = %zu", entry_cache_.max_entries());
   DBG(" ");
   DBG("      #     hash        version    address   address (hex)");
-  for (size_t i = 0; i < key_descriptors_.size(); ++i) {
-    const KeyDescriptor& kd = key_descriptors_[i];
+  size_t i = 0;
+  for (const EntryMetadata& metadata : entry_cache_) {
     DBG("   |%3zu: | %8zx  |%8zu  | %8zu | %8zx",
-        i,
-        size_t(kd.hash()),
-        size_t(kd.transaction_id()),
-        size_t(kd.address()),
-        size_t(kd.address()));
+        i++,
+        size_t(metadata.hash()),
+        size_t(metadata.transaction_id()),
+        size_t(metadata.first_address()),
+        size_t(metadata.first_address()));
   }
   DBG(" ");
 
@@ -1188,13 +1050,13 @@
 }
 
 void KeyValueStore::LogKeyDescriptor() const {
-  DBG("Key descriptors: count %zu", key_descriptors_.size());
-  for (auto& key : key_descriptors_) {
-    DBG("  - Key: %s, hash %#zx, transaction ID %zu, address %#zx",
-        key.deleted() ? "Deleted" : "Valid",
-        static_cast<size_t>(key.hash()),
-        static_cast<size_t>(key.transaction_id()),
-        static_cast<size_t>(key.address()));
+  DBG("Key descriptors: count %zu", entry_cache_.total_entries());
+  for (auto& metadata : entry_cache_) {
+    DBG("  - Key: %s, hash %#zx, transaction ID %zu, first address %#zx",
+        metadata.deleted() ? "Deleted" : "Valid",
+        static_cast<size_t>(metadata.hash()),
+        static_cast<size_t>(metadata.transaction_id()),
+        static_cast<size_t>(metadata.first_address()));
   }
 }
 
diff --git a/pw_kvs/public/pw_kvs/internal/entry.h b/pw_kvs/public/pw_kvs/internal/entry.h
index c2f9cf7..bcb6e41 100644
--- a/pw_kvs/public/pw_kvs/internal/entry.h
+++ b/pw_kvs/public/pw_kvs/internal/entry.h
@@ -25,6 +25,7 @@
 #include "pw_kvs/checksum.h"
 #include "pw_kvs/flash_memory.h"
 #include "pw_kvs/format.h"
+#include "pw_kvs/internal/hash.h"
 #include "pw_kvs/internal/key_descriptor.h"
 #include "pw_span/span.h"
 
@@ -87,19 +88,13 @@
   Entry() = default;
 
   KeyDescriptor descriptor(std::string_view key) const {
-    return KeyDescriptor(
-        key,
-        transaction_id(),
-        address(),
-        deleted() ? KeyDescriptor::kDeleted : KeyDescriptor::kValid);
+    return descriptor(Hash(key));
   }
 
   KeyDescriptor descriptor(uint32_t key_hash) const {
-    return KeyDescriptor(
-        key_hash,
-        transaction_id(),
-        address(),
-        deleted() ? KeyDescriptor::kDeleted : KeyDescriptor::kValid);
+    return KeyDescriptor{key_hash,
+                         transaction_id(),
+                         deleted() ? EntryState::kDeleted : EntryState::kValid};
   }
 
   StatusWithSize Write(std::string_view key, span<const std::byte> value) const;
diff --git a/pw_kvs/public/pw_kvs/internal/entry_cache.h b/pw_kvs/public/pw_kvs/internal/entry_cache.h
new file mode 100644
index 0000000..5cbef5c
--- /dev/null
+++ b/pw_kvs/public/pw_kvs/internal/entry_cache.h
@@ -0,0 +1,219 @@
+// Copyright 2020 The Pigweed Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License"); you may not
+// use this file except in compliance with the License. You may obtain a copy of
+// the License at
+//
+//     https://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+// License for the specific language governing permissions and limitations under
+// the License.
+#pragma once
+
+#include <cstddef>
+#include <cstdint>
+#include <string_view>
+
+#include "pw_containers/vector.h"
+#include "pw_kvs/flash_memory.h"
+#include "pw_kvs/internal/key_descriptor.h"
+#include "pw_span/span.h"
+
+namespace pw::kvs::internal {
+
+// Caches information about a key-value entry. Facilitates quickly finding
+// entries without having to read flash.
+class EntryMetadata {
+ public:
+  using Address = FlashPartition::Address;
+
+  EntryMetadata() = default;
+
+  uint32_t hash() const { return descriptor_->key_hash; }
+
+  uint32_t transaction_id() const { return descriptor_->transaction_id; }
+
+  EntryState state() const { return descriptor_->state; }
+
+  bool deleted() const { return state() == EntryState::kDeleted; }
+
+  // The first known address of this entry.
+  uint32_t first_address() const { return addresses_[0]; }
+
+  // All addresses for this entry, including redundant entries.
+  const span<Address>& addresses() const { return addresses_; }
+
+  // True if the KeyDesctiptor's transaction ID is newer than the specified ID.
+  bool IsNewerThan(uint32_t other_transaction_id) const {
+    // TODO: Consider handling rollover.
+    return transaction_id() > other_transaction_id;
+  }
+
+  // Adds a new address to the entry metadata. MUST NOT be called more than
+  // allowed by the redundancy.
+  void AddNewAddress(Address address) {
+    addresses_[addresses_.size()] = address;
+    addresses_ = span(addresses_.begin(), addresses_.size() + 1);
+  }
+
+  // Resets the KeyDescrtiptor and addresses to refer to the provided
+  // KeyDescriptor and address.
+  void Reset(const KeyDescriptor& descriptor, Address address);
+
+ private:
+  friend class EntryCache;
+
+  constexpr EntryMetadata(KeyDescriptor& descriptor, span<Address> addresses)
+      : descriptor_(&descriptor), addresses_(addresses) {}
+
+  KeyDescriptor* descriptor_;
+  span<Address> addresses_;
+};
+
+// Tracks entry metadata. Combines KeyDescriptors and with their associated
+// addresses.
+class EntryCache {
+ public:
+  using Address = FlashPartition::Address;
+
+  // The type to use for an address list with the specified number of entries
+  // and redundancy. kRedundancy extra entries are added to make room for a
+  // temporary list of entry addresses.
+  template <size_t kMaxEntries, size_t kRedundancy>
+  using AddressList = Address[kMaxEntries * kRedundancy + kRedundancy];
+
+  constexpr EntryCache(Vector<KeyDescriptor>& descriptors,
+                       Address* addresses,
+                       size_t redundancy)
+      : descriptors_(descriptors),
+        addresses_(addresses),
+        redundancy_(redundancy) {}
+
+  // Clears all
+  void Reset() { descriptors_.clear(); }
+
+  // Finds the metadata for an entry matching a particular key. Searches for a
+  // KeyDescriptor that matches this key and sets *metadata to point to it if
+  // one is found.
+  //
+  //             OK: there is a matching descriptor and *metadata is set
+  //      NOT_FOUND: there is no descriptor that matches this key, but this key
+  //                 has a unique hash (and could potentially be added to the
+  //                 KVS)
+  // ALREADY_EXISTS: there is no descriptor that matches this key, but the
+  //                 key's hash collides with the hash for an existing
+  //                 descriptor
+  //
+  Status Find(FlashPartition& partition,
+              std::string_view key,
+              EntryMetadata* metadata) const;
+
+  // Searches for a KeyDescriptor that matches this key and sets *metadata to
+  // point to it if one is found.
+  //
+  //          OK: there is a matching descriptor and *metadata is set
+  //   NOT_FOUND: there is no descriptor that matches this key
+  //
+  Status FindExisting(FlashPartition& partition,
+                      std::string_view key,
+                      EntryMetadata* metadata) const;
+
+  // Adds a new descriptor to the descriptor list. Must NOT be full!
+  EntryMetadata AddNew(const KeyDescriptor& entry, Address address);
+
+  // Adds a new descriptor, overwrites an existing one, or adds an additional
+  // redundant address to one. The sector size is included for checking that
+  // redundant entries are in different sectors.
+  Status AddNewOrUpdateExisting(const KeyDescriptor& descriptor,
+                                Address address,
+                                size_t sector_size_bytes);
+
+  // Returns a pointer to an array of redundancy() addresses for temporary use.
+  // This is used by the KeyValueStore to track reserved addresses when finding
+  // space for a new entry.
+  Address* TempReservedAddressesForWrite() const {
+    return &addresses_[descriptors_.max_size() * redundancy_];
+  }
+
+  // The number of copies of each entry.
+  size_t redundancy() const { return redundancy_; }
+
+  // True if no more entries can be added to the cache.
+  bool full() const { return descriptors_.full(); }
+
+  // The total number of entries, including tombstone entries.
+  size_t total_entries() const { return descriptors_.size(); }
+
+  // The total number of present (non-tombstone) entries.
+  size_t present_entries() const;
+
+  // The maximum number of entries supported by this EntryCache.
+  size_t max_entries() const { return descriptors_.max_size(); }
+
+  // Iterates over the EntryCache as EntryMetadata objects.
+  class iterator {
+   public:
+    iterator& operator++() {
+      ++metadata_.descriptor_;
+      return *this;
+    }
+    iterator& operator++(int) { return operator++(); }
+
+    // Updates the EntryMetadata object.
+    const EntryMetadata& operator*() const {
+      metadata_.addresses_ = entry_cache_.addresses(
+          metadata_.descriptor_ - entry_cache_.descriptors_.begin());
+      return metadata_;
+    }
+    const EntryMetadata* operator->() const { return &operator*(); }
+
+    constexpr bool operator==(const iterator& rhs) const {
+      return metadata_.descriptor_ == rhs.metadata_.descriptor_;
+    }
+    constexpr bool operator!=(const iterator& rhs) const {
+      return metadata_.descriptor_ != rhs.metadata_.descriptor_;
+    }
+
+   private:
+    friend class EntryCache;
+
+    constexpr iterator(const EntryCache* entry_cache, KeyDescriptor* descriptor)
+        : entry_cache_(*entry_cache), metadata_(*descriptor, {}) {}
+
+    const EntryCache& entry_cache_;
+
+    // Mark this mutable so it can be updated in the const operator*() method.
+    // This allows lazy updating of the EntryMetadata.
+    mutable EntryMetadata metadata_;
+  };
+
+  using const_iterator = iterator;
+
+  iterator begin() const { return iterator(this, descriptors_.begin()); }
+  iterator end() const { return iterator(this, descriptors_.end()); }
+
+ private:
+  int FindIndex(uint32_t key_hash) const;
+
+  // Adds the address to the descriptor at the specified index if there is an
+  // address slot available.
+  void AddAddressIfRoom(size_t descriptor_index, Address address);
+
+  // Returns a span of the valid addresses for the descriptor.
+  span<Address> addresses(size_t descriptor_index) const;
+
+  Address* first_address(size_t descriptor_index) const {
+    return &addresses_[descriptor_index * redundancy_];
+  }
+
+  Address* ResetAddresses(size_t descriptor_index, Address address);
+
+  Vector<KeyDescriptor>& descriptors_;
+  FlashPartition::Address* const addresses_;
+  const size_t redundancy_;
+};
+
+}  // namespace pw::kvs::internal
diff --git a/pw_kvs/public/pw_kvs/internal/key_descriptor.h b/pw_kvs/public/pw_kvs/internal/key_descriptor.h
index aa915f9..a7c7183 100644
--- a/pw_kvs/public/pw_kvs/internal/key_descriptor.h
+++ b/pw_kvs/public/pw_kvs/internal/key_descriptor.h
@@ -13,85 +13,19 @@
 // the License.
 #pragma once
 
-#include <cstddef>
 #include <cstdint>
-#include <string_view>
-
-#include "pw_containers/vector.h"
-#include "pw_kvs/flash_memory.h"
-#include "pw_kvs/internal/hash.h"
 
 namespace pw::kvs::internal {
 
-constexpr size_t kEntryRedundancy = 2;
+// Whether an entry is present or deleted.
+enum class EntryState : bool { kValid, kDeleted };
 
-// Caches information about a key-value entry. Facilitates quickly finding
-// entries without having to read flash.
-class KeyDescriptor {
- public:
-  enum State { kValid, kDeleted };
+// Essential metadata for an entry that is stored in memory.
+struct KeyDescriptor {
+  uint32_t key_hash;
+  uint32_t transaction_id;
 
-  uint32_t hash() const { return key_hash_; }
-  uint32_t transaction_id() const { return transaction_id_; }
-
-  // TODO: remove address() once all the use of it is gone.
-  uint32_t address() const { return addresses_[0]; }
-
-  Status UpdateAddress(FlashPartition::Address old_address,
-                       FlashPartition::Address new_address) {
-    for (auto& address : addresses()) {
-      if (address == old_address) {
-        address = new_address;
-        return Status::OK;
-      }
-    }
-
-    // Unable to find the address to update.
-    return Status::INVALID_ARGUMENT;
-  }
-
-  Vector<FlashPartition::Address, kEntryRedundancy>& addresses() {
-    return addresses_;
-  }
-  const Vector<FlashPartition::Address, kEntryRedundancy>& addresses() const {
-    return addresses_;
-  }
-
-  State state() const { return state_; }
-
-  // True if the KeyDesctiptor's transaction ID is newer than the specified ID.
-  bool IsNewerThan(uint32_t other_transaction_id) const {
-    // TODO: Consider handling rollover.
-    return transaction_id() > other_transaction_id;
-  }
-
-  bool deleted() const { return state_ == kDeleted; }
-
- private:
-  friend class Entry;
-
-  KeyDescriptor(uint32_t key_hash,
-                uint32_t version,
-                FlashPartition::Address address,
-                State initial_state)
-      : key_hash_(key_hash), transaction_id_(version), state_(initial_state) {
-    addresses_.assign(1, address);
-  }
-
-  KeyDescriptor(std::string_view key,
-                uint32_t version,
-                FlashPartition::Address address,
-                State initial_state)
-      : KeyDescriptor(Hash(key), version, address, initial_state) {}
-
-  uint32_t key_hash_;
-  uint32_t transaction_id_;
-
-  static_assert(kEntryRedundancy > 0u);
-  Vector<FlashPartition::Address, kEntryRedundancy> addresses_;
-
-  // TODO: This information could be packed into the above fields to save RAM.
-  State state_;
+  EntryState state;  // TODO: Pack into transaction ID? or something?
 };
 
 }  // namespace pw::kvs::internal
diff --git a/pw_kvs/public/pw_kvs/key_value_store.h b/pw_kvs/public/pw_kvs/key_value_store.h
index 9888e7f..8276c20 100644
--- a/pw_kvs/public/pw_kvs/key_value_store.h
+++ b/pw_kvs/public/pw_kvs/key_value_store.h
@@ -24,6 +24,7 @@
 #include "pw_kvs/flash_memory.h"
 #include "pw_kvs/format.h"
 #include "pw_kvs/internal/entry.h"
+#include "pw_kvs/internal/entry_cache.h"
 #include "pw_kvs/internal/key_descriptor.h"
 #include "pw_kvs/internal/sector_descriptor.h"
 #include "pw_kvs/internal/span_traits.h"
@@ -183,7 +184,7 @@
     return GarbageCollectPartial(span<const Address>());
   }
 
-  void LogDebugInfo();
+  void LogDebugInfo() const;
 
   // Classes and functions to support STL-style iteration.
   class iterator;
@@ -197,7 +198,7 @@
     // KeyValueStore::Get.
     StatusWithSize Get(span<std::byte> value_buffer,
                        size_t offset_bytes = 0) const {
-      return kvs_.Get(key(), *descriptor_, value_buffer, offset_bytes);
+      return kvs_.Get(key(), *iterator_, value_buffer, offset_bytes);
     }
 
     template <typename Pointer,
@@ -205,24 +206,24 @@
     Status Get(const Pointer& pointer) const {
       using T = std::remove_reference_t<std::remove_pointer_t<Pointer>>;
       CheckThatObjectCanBePutOrGet<T>();
-      return kvs_.FixedSizeGet(key(), *descriptor_, pointer, sizeof(T));
+      return kvs_.FixedSizeGet(key(), *iterator_, pointer, sizeof(T));
     }
 
     // Reads the size of the value referred to by this iterator. Equivalent to
     // KeyValueStore::ValueSize.
-    StatusWithSize ValueSize() const { return kvs_.ValueSize(*descriptor_); }
+    StatusWithSize ValueSize() const { return kvs_.ValueSize(*iterator_); }
 
    private:
     friend class iterator;
 
     constexpr Item(const KeyValueStore& kvs,
-                   const internal::KeyDescriptor* descriptor)
-        : kvs_(kvs), descriptor_(descriptor), key_buffer_{} {}
+                   const internal::EntryCache::iterator& iterator)
+        : kvs_(kvs), iterator_(iterator), key_buffer_{} {}
 
     void ReadKey();
 
     const KeyValueStore& kvs_;
-    const internal::KeyDescriptor* descriptor_;
+    internal::EntryCache::iterator iterator_;
 
     // Buffer large enough for a null-terminated version of any valid key.
     std::array<char, internal::Entry::kMaxKeyLength + 1> key_buffer_;
@@ -241,24 +242,23 @@
     }
 
     const Item* operator->() {
-      operator*();  // Read the key into the Item object.
-      return &item_;
+      return &operator*();  // Read the key into the Item object.
     }
 
     constexpr bool operator==(const iterator& rhs) const {
-      return item_.descriptor_ == rhs.item_.descriptor_;
+      return item_.iterator_ == rhs.item_.iterator_;
     }
 
     constexpr bool operator!=(const iterator& rhs) const {
-      return item_.descriptor_ != rhs.item_.descriptor_;
+      return item_.iterator_ != rhs.item_.iterator_;
     }
 
    private:
     friend class KeyValueStore;
 
     constexpr iterator(const KeyValueStore& kvs,
-                       const internal::KeyDescriptor* descriptor)
-        : item_(kvs, descriptor) {}
+                       const internal::EntryCache::iterator& iterator)
+        : item_(kvs, iterator) {}
 
     Item item_;
   };
@@ -266,12 +266,12 @@
   using const_iterator = iterator;  // Standard alias for iterable types.
 
   iterator begin() const;
-  iterator end() const { return iterator(*this, key_descriptors_.end()); }
+  iterator end() const { return iterator(*this, entry_cache_.end()); }
 
   // Returns the number of valid entries in the KeyValueStore.
-  size_t size() const;
+  size_t size() const { return entry_cache_.present_entries(); }
 
-  size_t max_size() const { return key_descriptors_.max_size(); }
+  size_t max_size() const { return entry_cache_.max_entries(); }
 
   size_t empty() const { return size() == 0u; }
 
@@ -288,7 +288,8 @@
 
   StorageStats GetStorageStats() const;
 
-  size_t redundancy() { return redundancy_; }
+  // Level of redundancy to use for writing entries.
+  size_t redundancy() const { return entry_cache_.redundancy(); }
 
  protected:
   using Address = FlashPartition::Address;
@@ -299,13 +300,18 @@
   // In the future, will be able to provide additional EntryFormats for
   // backwards compatibility.
   KeyValueStore(FlashPartition* partition,
-                Vector<KeyDescriptor>& key_descriptor_list,
-                Vector<SectorDescriptor>& sector_descriptor_list,
+                span<const EntryFormat> formats,
+                const Options& options,
                 size_t redundancy,
-                span<const EntryFormat> format,
-                const Options& options);
+                Vector<SectorDescriptor>& sector_descriptor_list,
+                const SectorDescriptor** temp_sectors_to_skip,
+                Vector<KeyDescriptor>& key_descriptor_list,
+                Address* addresses);
 
  private:
+  using EntryMetadata = internal::EntryMetadata;
+  using EntryState = internal::EntryState;
+
   template <typename T>
   static constexpr void CheckThatObjectCanBePutOrGet() {
     static_assert(
@@ -319,17 +325,13 @@
   Status ScanForEntry(const SectorDescriptor& sector,
                       Address start_address,
                       Address* next_entry_address);
-  Status AppendNewOrOverwriteStaleExistingDescriptor(
-      const KeyDescriptor& key_descriptor);
-
-  KeyDescriptor* FindDescriptor(uint32_t hash);
 
   Status PutBytes(std::string_view key, span<const std::byte> value);
 
-  StatusWithSize ValueSize(const KeyDescriptor& descriptor) const;
+  StatusWithSize ValueSize(const EntryMetadata& metadata) const;
 
   StatusWithSize Get(std::string_view key,
-                     const KeyDescriptor& descriptor,
+                     const EntryMetadata& metadata,
                      span<std::byte> value_buffer,
                      size_t offset_bytes) const;
 
@@ -338,32 +340,14 @@
                       size_t size_bytes) const;
 
   Status FixedSizeGet(std::string_view key,
-                      const KeyDescriptor& descriptor,
+                      const EntryMetadata& descriptor,
                       void* value,
                       size_t size_bytes) const;
 
   Status CheckOperation(std::string_view key) const;
 
-  Status FindKeyDescriptor(std::string_view key,
-                           const KeyDescriptor** result) const;
-
-  // Non-const version of FindKeyDescriptor.
-  Status FindKeyDescriptor(std::string_view key, KeyDescriptor** result) {
-    return static_cast<const KeyValueStore&>(*this).FindKeyDescriptor(
-        key, const_cast<const KeyDescriptor**>(result));
-  }
-
-  Status FindExistingKeyDescriptor(std::string_view key,
-                                   const KeyDescriptor** result) const;
-
-  Status FindExistingKeyDescriptor(std::string_view key,
-                                   KeyDescriptor** result) {
-    return static_cast<const KeyValueStore&>(*this).FindExistingKeyDescriptor(
-        key, const_cast<const KeyDescriptor**>(result));
-  }
-
-  Status WriteEntryForExistingKey(KeyDescriptor* key_descriptor,
-                                  KeyDescriptor::State new_state,
+  Status WriteEntryForExistingKey(EntryMetadata& metadata,
+                                  EntryState new_state,
                                   std::string_view key,
                                   span<const std::byte> value);
 
@@ -371,14 +355,14 @@
 
   Status WriteEntry(std::string_view key,
                     span<const std::byte> value,
-                    KeyDescriptor::State new_state,
-                    KeyDescriptor* prior_descriptor = nullptr,
+                    EntryState new_state,
+                    EntryMetadata* prior_metadata = nullptr,
                     size_t prior_size = 0);
 
-  KeyDescriptor& UpdateKeyDescriptor(const Entry& new_entry,
-                                     std::string_view key,
-                                     KeyDescriptor* prior_descriptor,
-                                     size_t prior_size);
+  EntryMetadata UpdateKeyDescriptor(const Entry& new_entry,
+                                    std::string_view key,
+                                    EntryMetadata* prior_metadata,
+                                    size_t prior_size);
 
   Status GetSectorForWrite(SectorDescriptor** sector,
                            size_t entry_size,
@@ -388,26 +372,25 @@
                      std::string_view key,
                      span<const std::byte> value);
 
-  Status RelocateEntry(KeyDescriptor& key_descriptor,
+  Status RelocateEntry(const EntryMetadata& metadata,
                        KeyValueStore::Address& address,
                        span<const Address> addresses_to_skip);
 
-  Status MoveEntry(Address new_address, Entry& entry);
-
   enum FindSectorMode { kAppendEntry, kGarbageCollect };
 
   Status FindSectorWithSpace(SectorDescriptor** found_sector,
                              size_t size,
                              FindSectorMode find_mode,
-                             span<const Address> addresses_to_skip);
+                             span<const Address> addresses_to_skip,
+                             span<const Address> reserved_addresses);
 
   SectorDescriptor* FindSectorToGarbageCollect(
       span<const Address> addresses_to_avoid);
 
   Status GarbageCollectPartial(span<const Address> addresses_to_skip);
 
-  Status RelocateKeyAddressesInSector(internal::SectorDescriptor& sector_to_gc,
-                                      internal::KeyDescriptor& descriptor,
+  Status RelocateKeyAddressesInSector(SectorDescriptor& sector_to_gc,
+                                      const EntryMetadata& descriptor,
                                       span<const Address> addresses_to_skip);
 
   Status GarbageCollectSector(SectorDescriptor* sector_to_gc,
@@ -428,13 +411,17 @@
     return SectorIndex(sector) * partition_.sector_size_bytes();
   }
 
-  SectorDescriptor* SectorFromAddress(const FlashPartition::Address address) {
+  SectorDescriptor* SectorFromAddress(Address address) {
     const size_t index = address / partition_.sector_size_bytes();
     // TODO: Add boundary checking once asserts are supported.
     // DCHECK_LT(index, sector_map_size_);`
     return &sectors_[index];
   }
 
+  const SectorDescriptor* SectorFromAddress(Address address) const {
+    return &sectors_[address / partition_.sector_size_bytes()];
+  }
+
   Address NextWritableAddress(const SectorDescriptor* sector) const {
     return SectorBaseAddress(sector) + partition_.sector_size_bytes() -
            sector->writable_bytes();
@@ -443,7 +430,7 @@
   internal::Entry CreateEntry(Address address,
                               std::string_view key,
                               span<const std::byte> value,
-                              KeyDescriptor::State state);
+                              EntryState state);
 
   void LogSectors() const;
   void LogKeyDescriptor() const;
@@ -453,13 +440,14 @@
 
   // Unordered list of KeyDescriptors. Finding a key requires scanning and
   // verifying a match by reading the actual entry.
-  Vector<KeyDescriptor>& key_descriptors_;
+  internal::EntryCache entry_cache_;
 
   // List of sectors used by this KVS.
   Vector<SectorDescriptor>& sectors_;
 
-  // Level of redundancy to use for writing entries.
-  const size_t redundancy_;
+  // Temp buffer with redundancy() * 2 - 1 entries for use in RelocateEntry.
+  // Used in FindSectorWithSpace and FindSectorToGarbageCollect.
+  const SectorDescriptor** const temp_sectors_to_skip_;
 
   Options options_;
 
@@ -503,11 +491,13 @@
                       span<const EntryFormat, kEntryFormats> formats,
                       const Options& options = {})
       : KeyValueStore(partition,
-                      key_descriptors_,
-                      sectors_,
-                      kRedundancy,
                       formats_,
-                      options) {
+                      options,
+                      kRedundancy,
+                      sectors_,
+                      temp_sectors_to_skip_,
+                      key_descriptors_,
+                      addresses_) {
     std::copy(formats.begin(), formats.end(), formats_.begin());
   }
 
@@ -515,11 +505,28 @@
   static_assert(kMaxEntries > 0u);
   static_assert(kMaxUsableSectors > 0u);
   static_assert(kRedundancy > 0u);
-  static_assert(kRedundancy <= internal::kEntryRedundancy);
   static_assert(kEntryFormats > 0u);
 
-  Vector<KeyDescriptor, kMaxEntries> key_descriptors_;
   Vector<SectorDescriptor, kMaxUsableSectors> sectors_;
+
+  // Allocate space for an list of SectorDescriptors to avoid while searching
+  // for space to write entries. This is used to avoid changing sectors that are
+  // reserved for a new entry or marked as having other redundant copies of an
+  // entry. Up to kRedundancy sectors are reserved for a new entry, and up to
+  // kRedundancy - 1 sectors can have redundant copies of an entry, giving a
+  // maximum of 2 * kRedundancy - 1 sectors to avoid.
+  const SectorDescriptor* temp_sectors_to_skip_[2 * kRedundancy - 1];
+
+  // KeyDescriptors for use by the KVS's EntryCache.
+  Vector<KeyDescriptor, kMaxEntries> key_descriptors_;
+
+  // An array of addresses associated with the KeyDescriptors for use with the
+  // EntryCache. To support having KeyValueStores with different redundancies,
+  // the addresses are stored in a parallel array instead of inside
+  // KeyDescriptors.
+  internal::EntryCache::AddressList<kRedundancy, kMaxEntries> addresses_;
+
+  // EntryFormats that can be read by this KeyValueStore.
   std::array<EntryFormat, kEntryFormats> formats_;
 };