pw_kvs: Implement KVS load from flash

This starts the implementation of loading the KVS from flash. Much
remains to do before this is done. This patch also includes some
extensions to how the tests are handled, to make it easier to
instantiate multiple KVS and flash backends for testing.

Changes is this CL to potentially split out:
- KVS dump command to log contents
- Re-enabling function and file display in log (useful!)
- Hex dumping to log

Change-Id: Ib1b25a01aa29eec3d3acd09e40205c23ded7bf6a
diff --git a/pw_kvs/key_value_store.cc b/pw_kvs/key_value_store.cc
index d3d9001..c6ccca8 100644
--- a/pw_kvs/key_value_store.cc
+++ b/pw_kvs/key_value_store.cc
@@ -17,8 +17,10 @@
 #include <cstring>
 #include <type_traits>
 
+#define PW_LOG_USE_ULTRA_SHORT_NAMES 1
 #include "pw_kvs_private/format.h"
 #include "pw_kvs_private/macros.h"
+#include "pw_log/log.h"
 
 namespace pw::kvs {
 
@@ -40,16 +42,203 @@
   return hash;
 }
 
+// TODO: Remove this when checksums in test are implemented.
+const uint32_t kNoChecksum = 0x12345678;
+
 constexpr size_t EntrySize(string_view key, span<const byte> value) {
   return sizeof(EntryHeader) + key.size() + value.size();
 }
 
+typedef FlashPartition::Address Address;
+
 }  // namespace
 
 Status KeyValueStore::Init() {
-  // enabled_ = true;
+  // Reset the number of occupied key descriptors; we will fill them later.
+  key_descriptor_list_size_ = 0;
 
-  return Status::UNIMPLEMENTED;
+  const size_t sector_size_bytes = partition_.sector_size_bytes();
+  const size_t sector_count = partition_.sector_count();
+
+  DBG("First pass: Read all entries from all sectors");
+  for (size_t sector_id = 0; sector_id < sector_count; ++sector_id) {
+    // Track writable bytes in this sector. Updated after reading each entry.
+    sector_map_[sector_id].tail_free_bytes = sector_size_bytes;
+
+    const Address sector_address = sector_id * sector_size_bytes;
+    Address entry_address = sector_address;
+
+    for (int num_entries_in_sector = 0;; num_entries_in_sector++) {
+      DBG("Load entry: sector=%zu, entry#=%d, address=%zu",
+          sector_id,
+          num_entries_in_sector,
+          size_t(entry_address));
+
+      if (!AddressInSector(sector_map_[sector_id], entry_address)) {
+        DBG("Fell off end of sector; moving to the next sector");
+        break;
+      }
+
+      Address next_entry_address;
+      Status status = LoadEntry(entry_address, &next_entry_address);
+      if (status == Status::NOT_FOUND) {
+        DBG("Hit un-written data in sector; moving to the next sector");
+        break;
+      }
+      if (status == Status::DATA_LOSS) {
+        // It's not clear KVS can make a unilateral decision about what to do
+        // in corruption cases. It's an application decision, for which we
+        // should offer some configurability. For now, entirely bail out of
+        // loading and give up.
+        //
+        // Later, scan for remaining valid keys; since it's entirely possible
+        // that there is a duplicate of the key elsewhere and everything is
+        // fine. Later, we can wipe and maybe recover the sector.
+        //
+        // TODO: Implement rest-of-sector scanning for valid entries.
+        return Status::DATA_LOSS;
+      }
+      TRY(status);
+
+      // Entry loaded successfully; so get ready to load the next one.
+      entry_address = next_entry_address;
+
+      // Update of the number of writable bytes in this sector.
+      sector_map_[sector_id].tail_free_bytes =
+          sector_size_bytes - (entry_address - sector_address);
+    }
+  }
+
+  DBG("Second pass: Count valid bytes in each sector");
+  // Initialize the sector sizes.
+  for (size_t sector_id = 0; sector_id < sector_count; ++sector_id) {
+    sector_map_[sector_id].valid_bytes = 0;
+  }
+  // For every valid key, increment the valid bytes for that sector.
+  for (size_t key_id = 0; key_id < key_descriptor_list_size_; ++key_id) {
+    uint32_t sector_id =
+        key_descriptor_list_[key_id].address / sector_size_bytes;
+    KeyDescriptor key_descriptor;
+    key_descriptor.address = key_descriptor_list_[key_id].address;
+    EntryHeader header;
+    TRY(ReadEntryHeader(key_descriptor, &header));
+    sector_map_[sector_id].valid_bytes += header.entry_size();
+  }
+  enabled_ = true;
+  return Status::OK;
+}
+
+Status KeyValueStore::LoadEntry(Address entry_address,
+                                Address* next_entry_address) {
+  const size_t alignment_bytes = partition_.alignment_bytes();
+
+  KeyDescriptor key_descriptor;
+  key_descriptor.address = entry_address;
+
+  EntryHeader header;
+  TRY(ReadEntryHeader(key_descriptor, &header));
+  // TODO: Should likely add a "LogHeader" method or similar.
+  DBG("Header: ");
+  DBG("   Address      = 0x%zx", size_t(entry_address));
+  DBG("   Magic        = 0x%zx", size_t(header.magic()));
+  DBG("   Checksum     = 0x%zx", size_t(header.checksum_as_uint32()));
+  DBG("   Key length   = 0x%zx", size_t(header.key_length()));
+  DBG("   Value length = 0x%zx", size_t(header.value_length()));
+  DBG("   Entry size   = 0x%zx", size_t(header.entry_size()));
+  DBG("   Padded size  = 0x%zx",
+      size_t(AlignUp(header.entry_size(), alignment_bytes)));
+
+  if (HeaderLooksLikeUnwrittenData(header)) {
+    return Status::NOT_FOUND;
+  }
+  key_descriptor.key_version = header.key_version();
+
+  // TODO: Handle multiple magics for formats that have changed.
+  if (header.magic() != entry_header_format_.magic) {
+    // TODO: It may be cleaner to have some logging helpers for these cases.
+    CRT("Found corrupt magic: %zx; expecting %zx; at address %zx",
+        size_t(header.magic()),
+        size_t(entry_header_format_.magic),
+        size_t(entry_address));
+    return Status::DATA_LOSS;
+  }
+
+  // Read the key from flash & validate the entry (which reads the value).
+  KeyBuffer key_buffer;
+  TRY(ReadEntryKey(key_descriptor, header.key_length(), key_buffer.data()));
+  TRY(ValidateEntryChecksum(
+      header, string_view(key_buffer.data()), key_descriptor));
+  key_descriptor.key_hash = HashKey(string_view(key_buffer.data()));
+
+  DBG("Key hash: %zx (%zu)",
+      size_t(key_descriptor.key_hash),
+      size_t(key_descriptor.key_hash));
+
+  TRY(AppendNewOrOverwriteStaleExistingDescriptor(key_descriptor));
+
+  // TODO: Extract this to something like "NextValidEntryAddress".
+  *next_entry_address =
+      AlignUp(key_descriptor.address + header.entry_size(), alignment_bytes);
+
+  return Status::OK;
+}
+
+// TODO: This method is the trigger of the O(valid_entries * all_entries) time
+// complexity for reading. At some cost to memory, this could be optimized by
+// using a hash table instead of scanning, but in practice this should be fine
+// for a small number of keys
+Status KeyValueStore::AppendNewOrOverwriteStaleExistingDescriptor(
+    const KeyDescriptor& key_descriptor) {
+  // With the new key descriptor, either add it to the descriptor table or
+  // overwrite an existing entry with an older version of the key.
+  KeyDescriptor* existing_descriptor = FindDescriptor(key_descriptor.key_hash);
+  if (existing_descriptor) {
+    if (existing_descriptor->key_version < key_descriptor.key_version) {
+      // Existing entry is old; replace the existing entry with the new one.
+      *existing_descriptor = key_descriptor;
+    } else {
+      // Otherwise, check for data integrity and leave the existing entry.
+      if (existing_descriptor->key_version == key_descriptor.key_version) {
+        ERR("Data loss: Duplicated old(=%zu) and new(=%zu) version",
+            size_t(existing_descriptor->key_version),
+            size_t(key_descriptor.key_version));
+        return Status::DATA_LOSS;
+      }
+      DBG("Found stale entry when appending; ignoring");
+    }
+    return Status::OK;
+  }
+  // Write new entry.
+  KeyDescriptor* newly_allocated_key_descriptor;
+  TRY(AppendEmptyDescriptor(&newly_allocated_key_descriptor));
+  *newly_allocated_key_descriptor = key_descriptor;
+  return Status::OK;
+}
+
+// TODO: Need a better name.
+Status KeyValueStore::AppendEmptyDescriptor(KeyDescriptor** new_descriptor) {
+  if (KeyListFull()) {
+    // TODO: Is this the right return code?
+    return Status::RESOURCE_EXHAUSTED;
+  }
+  *new_descriptor = &key_descriptor_list_[key_descriptor_list_size_++];
+  return Status::OK;
+}
+
+// TODO: Finish.
+bool KeyValueStore::HeaderLooksLikeUnwrittenData(
+    const EntryHeader& header) const {
+  // TODO: This is not correct; it should call through to flash memory.
+  return header.magic() == 0xffffffff;
+}
+
+KeyValueStore::KeyDescriptor* KeyValueStore::FindDescriptor(uint32_t hash) {
+  for (size_t key_id = 0; key_id < key_descriptor_list_size_; key_id++) {
+    if (key_descriptor_list_[key_id].key_hash == hash) {
+      return &(key_descriptor_list_[key_id]);
+    }
+  }
+  return nullptr;
 }
 
 StatusWithSize KeyValueStore::Get(string_view key,
@@ -61,8 +250,8 @@
 
   EntryHeader header;
   TRY(ReadEntryHeader(*key_descriptor, &header));
-  StatusWithSize result = ReadEntryValue(*key_descriptor, header, value_buffer);
 
+  StatusWithSize result = ReadEntryValue(*key_descriptor, header, value_buffer);
   if (result.ok() && options_.verify_on_read) {
     return ValidateEntryChecksum(header, key, value_buffer);
   }
@@ -70,6 +259,9 @@
 }
 
 Status KeyValueStore::Put(string_view key, span<const byte> value) {
+  DBG("Writing key/value; key length=%zu, value length=%zu",
+      key.size(),
+      value.size());
   TRY(InvalidOperation(key));
 
   if (value.size() > (1 << 24)) {
@@ -78,15 +270,16 @@
 
   KeyDescriptor* key_descriptor;
   if (FindKeyDescriptor(key, &key_descriptor).ok()) {
+    DBG("Writing over existing entry");
     return WriteEntryForExistingKey(key_descriptor, key, value);
   }
 
+  DBG("Writing new entry");
   return WriteEntryForNewKey(key, value);
 }
 
 Status KeyValueStore::Delete(string_view key) {
   TRY(InvalidOperation(key));
-
   return Status::UNIMPLEMENTED;
 }
 
@@ -103,11 +296,36 @@
   return entry_;
 }
 
+Status KeyValueStore::ValidateEntryChecksum(const EntryHeader& header,
+                                            string_view key,
+                                            const KeyDescriptor& entry) const {
+  // TODO: Remove this block once the checksums are implemented in test.
+  if (entry_header_format_.checksum == nullptr) {
+    const auto header_checksum = header.checksum();
+    const auto fixed_checksum = as_bytes(span(&kNoChecksum, 1));
+    if (std::memcmp(header_checksum.data(),
+                    fixed_checksum.data(),
+                    header_checksum.size()) == 0) {
+      return Status::OK;
+    }
+    return Status::DATA_LOSS;
+
+    // TODO: It's unclear why the below version doesn't work.
+    // return header.checksum() == as_bytes(span(&kNoChecksum, 1));
+  }
+  (void)(header);
+  (void)(key);
+  (void)(entry);
+  // TODO: Do incremental checksum verification by reading the value from
+  // flash a few blocks at a time.
+  return Status::OK;
+}
+
 Status KeyValueStore::InvalidOperation(string_view key) const {
   if (InvalidKey(key)) {
     return Status::INVALID_ARGUMENT;
   }
-  if (!/*enabled_*/ 0) {
+  if (!enabled_) {
     return Status::FAILED_PRECONDITION;
   }
   return Status::OK;
@@ -120,9 +338,11 @@
 
   for (auto& descriptor : key_descriptors()) {
     if (descriptor.key_hash == hash) {
+      DBG("Found match! For hash: %zx", size_t(hash));
       TRY(ReadEntryKey(descriptor, key.size(), key_buffer));
 
       if (key == string_view(key_buffer, key.size())) {
+        DBG("Keys matched too");
         *result = &descriptor;
         return Status::OK;
       }
@@ -161,7 +381,7 @@
   const size_t read_size = std::min(header.value_length(), value.size());
   StatusWithSize result = partition_.Read(
       key_descriptor.address + sizeof(header) + header.key_length(),
-      value.subspan(read_size));
+      value.subspan(0, read_size));
   TRY(result);
   if (read_size != header.value_length()) {
     return StatusWithSize(Status::RESOURCE_EXHAUSTED, read_size);
@@ -172,6 +392,9 @@
 Status KeyValueStore::ValidateEntryChecksum(const EntryHeader& header,
                                             string_view key,
                                             span<const byte> value) const {
+  // TODO: Re-enable this when we have checksum implementations.
+  return Status::OK;
+
   CalculateEntryChecksum(header, key, value);
   return entry_header_format_.checksum->Verify(header.checksum());
 }
@@ -181,13 +404,16 @@
                                                span<const byte> value) {
   SectorDescriptor* sector;
   TRY(FindOrRecoverSectorWithSpace(&sector, EntrySize(key, value)));
+  DBG("Writing existing entry; found sector: %d",
+      static_cast<int>(sector - sector_map_.data()));
   return AppendEntry(sector, key_descriptor, key, value);
 }
 
 Status KeyValueStore::WriteEntryForNewKey(string_view key,
                                           span<const byte> value) {
   if (KeyListFull()) {
-    // TODO: Log, and also expose "in memory keymap full" in stats dump.
+    WRN("KVS full: trying to store a new entry, but can't. Have %zu entries",
+        key_descriptor_list_size_);
     return Status::RESOURCE_EXHAUSTED;
   }
 
@@ -201,17 +427,19 @@
 
   SectorDescriptor* sector;
   TRY(FindOrRecoverSectorWithSpace(&sector, EntrySize(key, value)));
+  DBG("Writing new entry; found sector: %d",
+      static_cast<int>(sector - sector_map_.data()));
   TRY(AppendEntry(sector, &key_descriptor, key, value));
 
-  // Only increment key_descriptor_list_size_ when we are certain the write
-  // succeeded.
+  // Only increment bump our size when we are certain the write succeeded.
   key_descriptor_list_size_ += 1;
   return Status::OK;
 }
 
 Status KeyValueStore::RelocateEntry(KeyDescriptor& key_descriptor) {
-  // TODO: implement me
+  // TODO: Implement this function!
   (void)key_descriptor;
+  return Status::OK;
   return Status::UNIMPLEMENTED;
 }
 
@@ -219,14 +447,22 @@
 // Maintains the invariant that there is always at least 1 empty sector.
 KeyValueStore::SectorDescriptor* KeyValueStore::FindSectorWithSpace(
     size_t size) {
-  int start = (last_written_sector_ + 1) % sector_map_.size();
+  const size_t sector_count = partition_.sector_count();
+
+  // TODO: Ignore last written sector for now and scan from the beginning.
+  last_written_sector_ = sector_count - 1;
+
+  size_t start = (last_written_sector_ + 1) % sector_count;
   SectorDescriptor* first_empty_sector = nullptr;
   bool at_least_two_empty_sectors = false;
 
   for (size_t i = start; i != last_written_sector_;
-       i = (i + 1) % sector_map_.size()) {
+       i = (i + 1) % sector_count) {
+    DBG("Examining sector %zu", i);
     SectorDescriptor& sector = sector_map_[i];
+
     if (!SectorEmpty(sector) && sector.HasSpace(size)) {
+      DBG("Partially occupied sector with enough space; done!");
       return &sector;
     }
 
@@ -240,6 +476,8 @@
   }
 
   if (at_least_two_empty_sectors) {
+    DBG("Found a empty sector and a spare one; returning the first found (%d)",
+        static_cast<int>(first_empty_sector - sector_map_.data()));
     return first_empty_sector;
   }
   return nullptr;
@@ -326,10 +564,12 @@
                            key.size(),
                            value.size(),
                            key_descriptor->key_version + 1);
+  DBG("Appending entry with key version: %zx", size_t(header.key_version()));
 
   // Handles writing multiple concatenated buffers, while breaking up the writes
   // into alignment-sized blocks.
   Address address = NextWritableAddress(sector);
+  DBG("Appending to address: %zx", size_t(address));
   TRY_ASSIGN(
       size_t written,
       partition_.Write(
@@ -350,6 +590,12 @@
 
 Status KeyValueStore::VerifyEntry(SectorDescriptor* sector,
                                   KeyDescriptor* key_descriptor) {
+  if (entry_header_format_.checksum == nullptr) {
+    // TODO: Remove this once checksums are fully implemented.
+    return Status::OK;
+  }
+  return Status::UNIMPLEMENTED;
+
   // TODO: Implement me!
   (void)sector;
   (void)key_descriptor;
@@ -360,6 +606,9 @@
     const EntryHeader& header,
     const string_view key,
     span<const byte> value) const {
+  if (entry_header_format_.checksum == nullptr) {
+    return as_bytes(span(&kNoChecksum, 1));
+  }
   auto& checksum = *entry_header_format_.checksum;
 
   checksum.Reset();
@@ -369,4 +618,80 @@
   return checksum.state();
 }
 
+void KeyValueStore::LogDebugInfo() {
+  const size_t sector_count = partition_.sector_count();
+  const size_t sector_size_bytes = partition_.sector_size_bytes();
+  DBG("====================== KEY VALUE STORE DUMP =========================");
+  DBG(" ");
+  DBG("Flash partition:");
+  DBG("  Sector count     = %zu", sector_count);
+  DBG("  Sector max count = %zu", kUsableSectors);
+  DBG("  Sector size      = %zu", sector_size_bytes);
+  DBG("  Total size       = %zu", partition_.size_bytes());
+  DBG("  Alignment        = %zu", partition_.alignment_bytes());
+  DBG(" ");
+  DBG("Key descriptors:");
+  DBG("  Entry count     = %zu", key_descriptor_list_size_);
+  DBG("  Max entry count = %zu", kMaxEntries);
+  DBG(" ");
+  DBG("      #     hash        version    address   address (hex)");
+  for (size_t i = 0; i < key_descriptor_list_size_; ++i) {
+    const KeyDescriptor& kd = key_descriptor_list_[i];
+    DBG("   |%3zu: | %8zx  |%8zu  | %8zu | %8zx",
+        i,
+        size_t(kd.key_hash),
+        size_t(kd.key_version),
+        size_t(kd.address),
+        size_t(kd.address));
+  }
+  DBG(" ");
+
+  DBG("Sector descriptors:");
+  DBG("      #     tail free  valid    has_space");
+  for (size_t sector_id = 0; sector_id < sector_count; ++sector_id) {
+    const SectorDescriptor& sd = sector_map_[sector_id];
+    DBG("   |%3zu: | %8zu  |%8zu  | %s",
+        sector_id,
+        size_t(sd.tail_free_bytes),
+        size_t(sd.valid_bytes),
+        sd.tail_free_bytes ? "YES" : "");
+  }
+  DBG(" ");
+
+  // TODO: This should stop logging after some threshold.
+  // size_t dumped_bytes = 0;
+  DBG("Sector raw data:");
+  for (size_t sector_id = 0; sector_id < sector_count; ++sector_id) {
+    // Read sector data. Yes, this will blow the stack on embedded.
+    std::array<byte, 500> raw_sector_data;  // TODO
+    StatusWithSize sws =
+        partition_.Read(sector_id * sector_size_bytes, raw_sector_data);
+    DBG("Read: %zu bytes", sws.size());
+
+    DBG("  base    addr  offs   0  1  2  3  4  5  6  7");
+    for (size_t i = 0; i < sector_size_bytes; i += 8) {
+      DBG("  %3zu %8zx %5zu | %02x %02x %02x %02x %02x %02x %02x %02x",
+          sector_id,
+          (sector_id * sector_size_bytes) + i,
+          i,
+          static_cast<unsigned int>(raw_sector_data[i + 0]),
+          static_cast<unsigned int>(raw_sector_data[i + 1]),
+          static_cast<unsigned int>(raw_sector_data[i + 2]),
+          static_cast<unsigned int>(raw_sector_data[i + 3]),
+          static_cast<unsigned int>(raw_sector_data[i + 4]),
+          static_cast<unsigned int>(raw_sector_data[i + 5]),
+          static_cast<unsigned int>(raw_sector_data[i + 6]),
+          static_cast<unsigned int>(raw_sector_data[i + 7]));
+
+      // TODO: Fix exit condition.
+      if (i > 128) {
+        break;
+      }
+    }
+    DBG(" ");
+  }
+
+  DBG("////////////////////// KEY VALUE STORE DUMP END /////////////////////");
+}
+
 }  // namespace pw::kvs
diff --git a/pw_kvs/key_value_store_test.cc b/pw_kvs/key_value_store_test.cc
index 3b098a5..b839d2e 100644
--- a/pw_kvs/key_value_store_test.cc
+++ b/pw_kvs/key_value_store_test.cc
@@ -16,11 +16,17 @@
 
 #include <array>
 #include <cstdio>
+#if defined(__linux__)
+#include <vector>
+#endif  // defined(__linux__)
 #include <cstring>
 #include <type_traits>
 
 #include "pw_span/span.h"
 
+#define PW_LOG_USE_ULTRA_SHORT_NAMES 1
+#include "pw_log/log.h"
+
 #define USE_MEMORY_BUFFER 1
 
 #include "gtest/gtest.h"
@@ -30,6 +36,7 @@
 #include "pw_kvs_private/macros.h"
 #include "pw_log/log.h"
 #include "pw_status/status.h"
+#include "pw_string/string_builder.h"
 
 #if USE_MEMORY_BUFFER
 #include "pw_kvs/in_memory_fake_flash.h"
@@ -70,6 +77,58 @@
 static_assert(ConvertsToSpan<const span<bool>&>());
 static_assert(ConvertsToSpan<span<bool>&&>());
 
+// This is a self contained flash unit with both memory and a single partition.
+template <uint32_t sector_size_bytes, uint16_t sector_count>
+struct FlashWithPartitionFake {
+  // Default to 16 byte alignment, which is common in practice.
+  FlashWithPartitionFake() : FlashWithPartitionFake(16) {}
+  FlashWithPartitionFake(size_t alignment_bytes)
+      : memory(alignment_bytes), partition(&memory, 0, memory.sector_count()) {}
+
+  InMemoryFakeFlash<sector_size_bytes, sector_count> memory;
+  FlashPartition partition;
+
+ public:
+#if defined(__linux__)
+  Status Dump(const char* filename) {
+    std::FILE* out_file = std::fopen(filename, "w+");
+    if (out_file == nullptr) {
+      PW_LOG_ERROR("Failed to dump to %s", filename);
+      return Status::DATA_LOSS;
+    }
+    std::vector<std::byte> out_vec(memory.size_bytes());
+    Status status =
+        memory.Read(0, pw::span<std::byte>(out_vec.data(), out_vec.size()));
+    if (status != Status::OK) {
+      fclose(out_file);
+      return status;
+    }
+
+    size_t written =
+        std::fwrite(out_vec.data(), 1, memory.size_bytes(), out_file);
+    if (written != memory.size_bytes()) {
+      PW_LOG_ERROR("Failed to dump to %s, written=%u",
+                   filename,
+                   static_cast<unsigned>(written));
+      status = Status::DATA_LOSS;
+    } else {
+      PW_LOG_INFO("Dumped to %s", filename);
+      status = Status::OK;
+    }
+
+    fclose(out_file);
+    return status;
+  }
+#else
+  Status Dump(const char* filename) {
+    (void)(filename);
+    return Status::OK;
+  }
+#endif  // defined(__linux__)
+};
+
+typedef FlashWithPartitionFake<4 * 128 /*sector size*/, 6 /*sectors*/> Flash;
+
 #if USE_MEMORY_BUFFER
 // Although it might be useful to test other configurations, some tests require
 // at least 3 sectors; therfore it should have this when checked in.
@@ -338,6 +397,151 @@
   EXPECT_EQ(kvs.size(), 0u);
 }
 
+#define ASSERT_OK(expr) ASSERT_EQ(Status::OK, expr)
+#define EXPECT_OK(expr) EXPECT_EQ(Status::OK, expr)
+
+#define AS_SIZE(x) static_cast<size_t>(x)
+
+TEST(InMemoryKvs, DISABLED_WriteOneKeyMultipleTimes) {
+  // Create and erase the fake flash. It will persist across reloads.
+  Flash flash;
+  ASSERT_OK(flash.partition.Erase());
+
+  int num_reloads = 2;
+  for (int reload = 0; reload < num_reloads; ++reload) {
+    DBG("xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx");
+    DBG("xxx                                      xxxx");
+    DBG("xxx               Reload %2d              xxxx", reload);
+    DBG("xxx                                      xxxx");
+    DBG("xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx");
+
+    // Create and initialize the KVS.
+    constexpr EntryHeaderFormat format{.magic = 0xBAD'C0D3,
+                                       .checksum = nullptr};
+    KeyValueStore kvs(&flash.partition, format);
+    ASSERT_OK(kvs.Init());
+
+    // Write the same entry many times.
+    const char* key = "abcd";
+    const size_t num_writes = 1;  // TODO: Make this > 1 when things work.
+    uint32_t written_value;
+    EXPECT_EQ(kvs.size(), (reload == 0) ? 0 : 1u);
+    for (uint32_t i = 0; i < num_writes; ++i) {
+      INF("PUT #%zu for key %s with value %zu", AS_SIZE(i), key, AS_SIZE(i));
+
+      written_value = i + 0xfc;  // Prevent accidental pass with zero.
+      EXPECT_OK(kvs.Put(key, written_value));
+      EXPECT_EQ(kvs.size(), 1u);
+    }
+
+    // Verify that we can read the value back.
+    INF("GET final value for key: %s", key);
+    uint32_t actual_value;
+    EXPECT_OK(kvs.Get(key, &actual_value));
+    EXPECT_EQ(actual_value, written_value);
+
+    kvs.LogDebugInfo();
+
+    char fname_buf[64] = {'\0'};
+    snprintf(&fname_buf[0],
+             sizeof(fname_buf),
+             "WriteOneKeyMultipleTimes_%d.bin",
+             reload);
+    flash.Dump(fname_buf);
+  }
+}
+
+TEST(InMemoryKvs, WritingMultipleKeysIncreasesSize) {
+  // Create and erase the fake flash.
+  Flash flash;
+  ASSERT_OK(flash.partition.Erase());
+
+  // Create and initialize the KVS.
+  constexpr EntryHeaderFormat format{.magic = 0xBAD'C0D3, .checksum = nullptr};
+  KeyValueStore kvs(&flash.partition, format);
+  ASSERT_OK(kvs.Init());
+
+  // Write the same entry many times.
+  const size_t num_writes = 10;
+  EXPECT_EQ(kvs.size(), 0u);
+  for (size_t i = 0; i < num_writes; ++i) {
+    StringBuffer<150> key;
+    key << "key_" << i;
+    INF("PUT #%zu for key %s with value %zu", i, key.c_str(), i);
+
+    size_t value = i + 77;  // Prevent accidental pass with zero.
+    EXPECT_OK(kvs.Put(key.view(), value));
+    EXPECT_EQ(kvs.size(), i + 1);
+  }
+  kvs.LogDebugInfo();
+  flash.Dump("WritingMultipleKeysIncreasesSize.bin");
+}
+
+TEST(InMemoryKvs, WriteAndReadOneKey) {
+  // Create and erase the fake flash.
+  Flash flash;
+  ASSERT_OK(flash.partition.Erase());
+
+  // Create and initialize the KVS.
+  constexpr EntryHeaderFormat format{.magic = 0xBAD'C0D3, .checksum = nullptr};
+  KeyValueStore kvs(&flash.partition, format);
+  ASSERT_OK(kvs.Init());
+
+  // Add two entries with different keys and values.
+  const char* key = "Key1";
+  INF("PUT value for key: %s", key);
+  uint8_t written_value = 0xDA;
+  ASSERT_OK(kvs.Put(key, written_value));
+  EXPECT_EQ(kvs.size(), 1u);
+
+  INF("GET value for key: %s", key);
+  uint8_t actual_value;
+  ASSERT_OK(kvs.Get(key, &actual_value));
+  EXPECT_EQ(actual_value, written_value);
+
+  EXPECT_EQ(kvs.size(), 1u);
+}
+
+TEST(InMemoryKvs, Basic) {
+  const char* key1 = "Key1";
+  const char* key2 = "Key2";
+
+  // Create and erase the fake flash.
+  Flash flash;
+  ASSERT_EQ(Status::OK, flash.partition.Erase());
+
+  // Create and initialize the KVS.
+  constexpr EntryHeaderFormat format{.magic = 0xBAD'C0D3, .checksum = nullptr};
+  KeyValueStore kvs(&flash.partition, format);
+  ASSERT_OK(kvs.Init());
+
+  // Add two entries with different keys and values.
+  INF("PUT first value");
+  uint8_t value1 = 0xDA;
+  ASSERT_OK(kvs.Put(key1, as_bytes(span(&value1, sizeof(value1)))));
+  EXPECT_EQ(kvs.size(), 1u);
+
+  INF("PUT second value");
+  uint32_t value2 = 0xBAD0301f;
+  ASSERT_OK(kvs.Put(key2, value2));
+  EXPECT_EQ(kvs.size(), 2u);
+
+  INF("--------------------------------");
+  INF("GET second value");
+  // Verify data
+  uint32_t test2;
+  EXPECT_OK(kvs.Get(key2, &test2));
+
+  INF("GET first value");
+  uint8_t test1;
+  ASSERT_OK(kvs.Get(key1, &test1));
+
+  EXPECT_EQ(test1, value1);
+  EXPECT_EQ(test2, value2);
+
+  EXPECT_EQ(kvs.size(), 2u);
+}
+
 TEST_F(KeyValueStoreTest, DISABLED_MaxKeyLength) {
   // Erase
   ASSERT_EQ(Status::OK, test_partition.Erase(0, test_partition.sector_count()));
@@ -700,7 +904,7 @@
       0xAA, 0x55, 0xB5, 0x87, 0x00, 0x00, 0x44, 0x00,  // Header (GOOD CRC)
       0x4B, 0x65, 0x79, 0x32,                          // Key (keys[1])
       0x1F, 0x30, 0xD0, 0xBA                           // Value
-  );                        
+  );
   static constexpr auto kKvsTestDataAligned8Top = ByteArray(
       0xCD, 0xAB, 0x03, 0x00, 0x08, 0x00, 0xFF, 0xFF   // Sector Header
   );
@@ -1475,4 +1679,28 @@
 }
 #endif  // USE_MEMORY_BUFFER
 
+TEST(KeyValueStoreEntryHeader, KeyValueSizes) {
+  EntryHeader header;
+
+  header.set_key_length(9u);
+  EXPECT_EQ(header.key_length(), 9u);
+
+  header.set_value_length(11u);
+  EXPECT_EQ(header.value_length(), 11u);
+
+  header.set_key_length(6u);
+  header.set_value_length(100u);
+  EXPECT_EQ(header.key_length(), 6u);
+  EXPECT_EQ(header.value_length(), 100u);
+
+  header.set_value_length(10u);
+  EXPECT_EQ(header.key_length(), 6u);
+  EXPECT_EQ(header.value_length(), 10u);
+
+  header.set_key_length(3u);
+  header.set_value_length(4000u);
+  EXPECT_EQ(header.key_length(), 3u);
+  EXPECT_EQ(header.value_length(), 4000u);
+}
+
 }  // namespace pw::kvs
diff --git a/pw_kvs/public/pw_kvs/flash_memory.h b/pw_kvs/public/pw_kvs/flash_memory.h
index 06a5148..d61a896 100644
--- a/pw_kvs/public/pw_kvs/flash_memory.h
+++ b/pw_kvs/public/pw_kvs/flash_memory.h
@@ -156,6 +156,8 @@
   //          UNKNOWN, on HAL error
   virtual Status Erase(Address address, size_t num_sectors);
 
+  Status Erase() { return Erase(0, this->sector_count()); }
+
   // Reads bytes from flash into buffer. Blocking call.
   // Returns: OK, on success.
   //          TIMEOUT, on timeout.
@@ -192,7 +194,7 @@
   // Overridden by derived classes. The reported sector size is space available
   // to users of FlashPartition. It accounts for space reserved in the sector
   // for FlashPartition to store metadata.
-  virtual uint32_t sector_size_bytes() const {
+  virtual size_t sector_size_bytes() const {
     return flash_.sector_size_bytes();
   }
 
diff --git a/pw_kvs/public/pw_kvs/in_memory_fake_flash.h b/pw_kvs/public/pw_kvs/in_memory_fake_flash.h
index 901684e..741f46d 100644
--- a/pw_kvs/public/pw_kvs/in_memory_fake_flash.h
+++ b/pw_kvs/public/pw_kvs/in_memory_fake_flash.h
@@ -16,7 +16,10 @@
 #include <array>
 #include <cstring>
 
+// TODO: Push/pop log module name due to logging in header.
+//       Alternately: Push implementation into .cc
 #include "pw_kvs/flash_memory.h"
+#include "pw_log/log.h"
 #include "pw_status/status.h"
 
 namespace pw::kvs {
@@ -26,8 +29,8 @@
 template <uint32_t kSectorSize, uint16_t kSectorCount>
 class InMemoryFakeFlash : public FlashMemory {
  public:
-  InMemoryFakeFlash(uint8_t alignment = 1)  // default 8 bit alignment
-      : FlashMemory(kSectorSize, kSectorCount, alignment) {}
+  InMemoryFakeFlash(uint8_t alignment_bytes = 1)  // default 8 bit alignment
+      : FlashMemory(kSectorSize, kSectorCount, alignment_bytes) {}
 
   // Always enabled
   Status Enable() override { return Status::OK; }
@@ -39,17 +42,24 @@
   // Returns: OK, on success.
   //          INVALID_ARGUMENT, if address or sector count is invalid.
   //          UNKNOWN, on HAL error
-  Status Erase(Address addr, size_t num_sectors) override {
-    if (addr % sector_size_bytes() != 0) {
+  Status Erase(Address address, size_t num_sectors) override {
+    if (address % sector_size_bytes() != 0) {
+      ERR("Attempted to erase sector at non-sector aligned boundary: %zx",
+          size_t(address));
       return Status::INVALID_ARGUMENT;
     }
-    if (addr / sector_size_bytes() + num_sectors > sector_count()) {
+    size_t sector_id = address / sector_size_bytes();
+    if (address / sector_size_bytes() + num_sectors > sector_count()) {
+      ERR("Tried to erase a sector at an address past partition end; "
+          "address: %zx, sector implied: %zu",
+          size_t(address),
+          sector_id);
       return Status::UNKNOWN;
     }
-    if (addr % alignment_bytes() != 0) {
+    if (address % alignment_bytes() != 0) {
       return Status::INVALID_ARGUMENT;
     }
-    std::memset(&buffer_[addr], 0xFF, sector_size_bytes() * num_sectors);
+    std::memset(&buffer_[address], 0xFF, sector_size_bytes() * num_sectors);
     return Status::OK;
   }
 
@@ -78,6 +88,7 @@
     // Check in erased state
     for (unsigned i = 0; i < data.size(); i++) {
       if (buffer_[address + i] != 0xFF) {
+        ERR("Writing to previously written address: %zx", size_t(address));
         return Status::UNKNOWN;
       }
     }
diff --git a/pw_kvs/public/pw_kvs/key_value_store.h b/pw_kvs/public/pw_kvs/key_value_store.h
index a36f03c..d109f22 100644
--- a/pw_kvs/public/pw_kvs/key_value_store.h
+++ b/pw_kvs/public/pw_kvs/key_value_store.h
@@ -24,6 +24,8 @@
 #include "pw_status/status.h"
 #include "pw_status/status_with_size.h"
 
+// TODO: Resolve uses of partition_.sector_count() vs kMaxUsableSectors.
+
 namespace pw::kvs {
 namespace internal {
 
@@ -69,6 +71,9 @@
   static constexpr size_t kMaxEntries = 64;
   static constexpr size_t kUsableSectors = 64;
 
+  // +1 for null-terminator.
+  typedef std::array<char, kMaxKeyLength + 1> KeyBuffer;
+
   // In the future, will be able to provide additional EntryHeaderFormats for
   // backwards compatibility.
   constexpr KeyValueStore(FlashPartition* partition,
@@ -92,6 +97,10 @@
     static_assert(std::is_trivially_copyable<T>(), "KVS values must copyable");
     static_assert(!std::is_pointer<T>(), "KVS values cannot be pointers");
 
+    // TODO: Re-enable this check once we are further along.
+#if 0
+    // Ensure that the size of the stored value matches the size of the type.
+    // Otherwise, report error. This check avoids potential memory corruption.
     StatusWithSize result = ValueSize(key);
     if (!result.ok()) {
       return result.status();
@@ -99,6 +108,7 @@
     if (result.size() != sizeof(T)) {
       return Status::INVALID_ARGUMENT;
     }
+#endif
     return Get(key, as_writable_bytes(span(value, 1))).status();
   }
 
@@ -115,10 +125,14 @@
   Status Delete(std::string_view key);
 
   StatusWithSize ValueSize(std::string_view key) const {
+    // TODO: Implement this! For now, just accept whatever size.
+    // return Status::OK;
     (void)key;
     return Status::UNIMPLEMENTED;
   }
 
+  void LogDebugInfo();
+
   // Classes and functions to support STL-style iteration.
   class Iterator;
 
@@ -243,6 +257,16 @@
                                std::string_view key,
                                span<const std::byte> value) const;
 
+  Status LoadEntry(Address entry_address, Address* next_entry_address);
+  Status AppendNewOrOverwriteStaleExistingDescriptor(
+      const KeyDescriptor& key_descriptor);
+  Status AppendEmptyDescriptor(KeyDescriptor** new_descriptor);
+
+  // This version reads from flash.
+  Status ValidateEntryChecksum(const EntryHeader& header,
+                               std::string_view key,
+                               const KeyDescriptor& entry) const;
+
   Status WriteEntryForExistingKey(KeyDescriptor* key_descriptor,
                                   std::string_view key,
                                   span<const std::byte> value);
@@ -259,6 +283,10 @@
 
   SectorDescriptor* FindSectorToGarbageCollect();
 
+  bool HeaderLooksLikeUnwrittenData(const EntryHeader& header) const;
+
+  KeyDescriptor* FindDescriptor(uint32_t hash);
+
   Status AppendEntry(SectorDescriptor* sector,
                      KeyDescriptor* key_descriptor,
                      std::string_view key,
@@ -318,7 +346,9 @@
 
   // This is dense, so sector_id == indexof(SectorDescriptor) in sector_map
   std::array<SectorDescriptor, kUsableSectors> sector_map_;
-  size_t last_written_sector_;  // TODO: this variable is not used!
+  size_t last_written_sector_;
+
+  bool enabled_ = false;
 };
 
 }  // namespace pw::kvs
diff --git a/pw_kvs/pw_kvs_private/format.h b/pw_kvs/pw_kvs_private/format.h
index 25484c7..9cd470c 100644
--- a/pw_kvs/pw_kvs_private/format.h
+++ b/pw_kvs/pw_kvs_private/format.h
@@ -23,7 +23,7 @@
 
 // In-flash header format.
 struct EntryHeader {
-  EntryHeader() = default;
+  EntryHeader() : key_value_length_(0) {}
 
   EntryHeader(uint32_t magic,
               span<const std::byte> checksum,
@@ -46,12 +46,29 @@
                 sizeof(*this) - offsetof(EntryHeader, key_value_length_));
   }
 
+  size_t entry_size() const {
+    return sizeof(*this) + key_length() + value_length();
+  }
+
   uint32_t magic() const { return magic_; }
+
   span<const std::byte> checksum() const {
     return as_bytes(span(&checksum_, 1));
   }
+
+  uint32_t checksum_as_uint32() const { return checksum_; }
+
   size_t key_length() const { return key_value_length_ & kKeyLengthMask; }
+  void set_key_length(uint32_t key_length) {
+    key_value_length_ = key_length | (~kKeyLengthMask & key_value_length_);
+  }
+
   size_t value_length() const { return key_value_length_ >> kValueLengthShift; }
+  void set_value_length(uint32_t value_length) {
+    key_value_length_ = (value_length << kValueLengthShift) |
+                        (kKeyLengthMask & key_value_length_);
+  }
+
   uint32_t key_version() const { return key_version_; }
 
  private:
@@ -60,7 +77,9 @@
 
   uint32_t magic_;
   uint32_t checksum_;
-  // 0:5 key_len 8:31 value_len
+  //  6 bits, 0: 5 - key - maximum 64 characters
+  //  2 bits, 6: 7 - reserved
+  // 24 bits, 8:31 - value - maximum 16MB
   uint32_t key_value_length_;
   uint32_t key_version_;
 };
diff --git a/pw_log_basic/log_basic.cc b/pw_log_basic/log_basic.cc
index a8f8c8d..c586830 100644
--- a/pw_log_basic/log_basic.cc
+++ b/pw_log_basic/log_basic.cc
@@ -40,6 +40,7 @@
 // TODO(pwbug/17): Expose these through the config system.
 #define PW_USE_EMOJIS 1
 #define PW_LOG_SHOW_FILENAME 0
+#define PW_LOG_SHOW_FUNCTION 0
 #define PW_LOG_SHOW_FLAG 0
 #define PW_LOG_SHOW_MODULE 0
 
@@ -98,17 +99,22 @@
                        const char* function_name,
                        const char* message,
                        ...) {
-  PW_UNUSED(function_name);
-  PW_UNUSED(line_number);
-
   // Accumulate the log message in this buffer, then output it.
   pw::StringBuffer<150> buffer;
 
   // Column: Filename
 #if PW_LOG_SHOW_FILENAME
-  buffer.Format(" %-30s |", GetFileBasename(file_name));
+  buffer.Format(" %-30s:%4d |", GetFileBasename(file_name), line_number);
 #else
   PW_UNUSED(file_name);
+  PW_UNUSED(line_number);
+#endif
+
+  // Column: Function
+#if PW_LOG_SHOW_FUNCTION
+  buffer.Format(" %20s |", function_name);
+#else
+  PW_UNUSED(function_name);
 #endif
 
   // Column: Module