pw_kvs: Handle key hash collisions

- Log and return ALREADY_EXISTS if a Put operation introduces a key
  whose hash collides with another key already known to the KVS.
- Add tests to cover key hash collisions with Put, Get, ValueSize, and
  Delete.

Change-Id: I957e7c4998f700517b70d3b999f0e3f4f4621f6d
diff --git a/pw_kvs/key_value_store.cc b/pw_kvs/key_value_store.cc
index fd54654..85565b9 100644
--- a/pw_kvs/key_value_store.cc
+++ b/pw_kvs/key_value_store.cc
@@ -250,11 +250,7 @@
   TRY_WITH_SIZE(CheckOperation(key));
 
   const KeyDescriptor* key_descriptor;
-  TRY_WITH_SIZE(FindKeyDescriptor(key, &key_descriptor));
-
-  if (key_descriptor->deleted()) {
-    return StatusWithSize(Status::NOT_FOUND);
-  }
+  TRY_WITH_SIZE(FindExistingKeyDescriptor(key, &key_descriptor));
 
   Entry header;
   TRY_WITH_SIZE(ReadEntryHeader(key_descriptor->address, &header));
@@ -288,7 +284,9 @@
   }
 
   KeyDescriptor* key_descriptor;
-  if (FindKeyDescriptor(key, &key_descriptor).ok()) {
+  Status status = FindKeyDescriptor(key, &key_descriptor);
+
+  if (status.ok()) {
     DBG("Writing over existing entry for key 0x%08" PRIx32 " in sector %zu",
         key_descriptor->key_hash,
         SectorIndex(SectorFromAddress(key_descriptor->address)));
@@ -296,18 +294,18 @@
         key_descriptor, KeyDescriptor::kValid, key, value);
   }
 
-  return WriteEntryForNewKey(key, value);
+  if (status == Status::NOT_FOUND) {
+    return WriteEntryForNewKey(key, value);
+  }
+
+  return status;
 }
 
 Status KeyValueStore::Delete(string_view key) {
   TRY(CheckOperation(key));
 
   KeyDescriptor* key_descriptor;
-  TRY(FindKeyDescriptor(key, &key_descriptor));
-
-  if (key_descriptor->deleted()) {
-    return Status::NOT_FOUND;
-  }
+  TRY(FindExistingKeyDescriptor(key, &key_descriptor));
 
   DBG("Writing tombstone for existing key 0x%08" PRIx32 " in sector %zu",
       key_descriptor->key_hash,
@@ -363,11 +361,7 @@
   TRY_WITH_SIZE(CheckOperation(key));
 
   const KeyDescriptor* key_descriptor;
-  TRY_WITH_SIZE(FindKeyDescriptor(key, &key_descriptor));
-
-  if (key_descriptor->deleted()) {
-    return StatusWithSize(Status::NOT_FOUND);
-  }
+  TRY_WITH_SIZE(FindExistingKeyDescriptor(key, &key_descriptor));
 
   Entry header;
   TRY_WITH_SIZE(ReadEntryHeader(key_descriptor->address, &header));
@@ -413,6 +407,15 @@
   return Status::OK;
 }
 
+// Searches for a KeyDescriptor that matches this key and sets *result to point
+// to it if one is found.
+//
+//             OK: there is a matching descriptor and *result is set
+//      NOT_FOUND: there is no descriptor that matches this key, but this key
+//                 has a unique hash (and could potentially be added to the KVS)
+// ALREADY_EXISTS: there is no descriptor that matches this key, but the
+//                 key's hash collides with the hash for an existing descriptor
+//
 Status KeyValueStore::FindKeyDescriptor(string_view key,
                                         const KeyDescriptor** result) const {
   char key_buffer[kMaxKeyLength];
@@ -426,12 +429,34 @@
         DBG("Found match for key hash 0x%08" PRIx32, hash);
         *result = &descriptor;
         return Status::OK;
+      } else {
+        WRN("Found key hash collision for 0x%08" PRIx32, hash);
+        return Status::ALREADY_EXISTS;
       }
     }
   }
   return Status::NOT_FOUND;
 }
 
+// Searches for a KeyDescriptor that matches this key and sets *result to point
+// to it if one is found.
+//
+//          OK: there is a matching descriptor and *result is set
+//   NOT_FOUND: there is no descriptor that matches this key
+//
+Status KeyValueStore::FindExistingKeyDescriptor(
+    string_view key, const KeyDescriptor** result) const {
+  Status status = FindKeyDescriptor(key, result);
+
+  // If the key's hash collides with an existing key or if the key is deleted,
+  // treat it as if it is not in the KVS.
+  if (status == Status::ALREADY_EXISTS ||
+      (status.ok() && (*result)->deleted())) {
+    return Status::NOT_FOUND;
+  }
+  return status;
+}
+
 Status KeyValueStore::ReadEntryHeader(Address address, Entry* header) const {
   return partition_.Read(address, sizeof(*header), header).status();
 }
@@ -646,10 +671,7 @@
 Status KeyValueStore::FindOrRecoverSectorWithSpace(SectorDescriptor** sector,
                                                    size_t size) {
   Status result = FindSectorWithSpace(sector, size);
-  if (result.ok()) {
-    return result;
-  }
-  if (options_.partial_gc_on_write) {
+  if (result == Status::RESOURCE_EXHAUSTED && options_.partial_gc_on_write) {
     // Garbage collect and then try again to find the best sector.
     TRY(GarbageCollectOneSector());
     return FindSectorWithSpace(sector, size);
diff --git a/pw_kvs/key_value_store_test.cc b/pw_kvs/key_value_store_test.cc
index 70e5db1..b608b8f 100644
--- a/pw_kvs/key_value_store_test.cc
+++ b/pw_kvs/key_value_store_test.cc
@@ -297,6 +297,43 @@
   EXPECT_TRUE(kvs_.empty());
 }
 
+TEST_F(EmptyInitializedKvs, Collision_WithPresentKey) {
+  // Both hash to 0x19df36f0.
+  constexpr std::string_view key1 = "D4";
+  constexpr std::string_view key2 = "dFU6S";
+
+  ASSERT_EQ(Status::OK, kvs_.Put(key1, 1000));
+
+  EXPECT_EQ(Status::ALREADY_EXISTS, kvs_.Put(key2, 999));
+
+  int value = 0;
+  EXPECT_EQ(Status::OK, kvs_.Get(key1, &value));
+  EXPECT_EQ(1000, value);
+
+  EXPECT_EQ(Status::NOT_FOUND, kvs_.Get(key2, &value));
+  EXPECT_EQ(Status::NOT_FOUND, kvs_.ValueSize(key2).status());
+  EXPECT_EQ(Status::NOT_FOUND, kvs_.Delete(key2));
+}
+
+TEST_F(EmptyInitializedKvs, Collision_WithDeletedKey) {
+  // Both hash to 0x4060f732.
+  constexpr std::string_view key1 = "1U2";
+  constexpr std::string_view key2 = "ahj9d";
+
+  ASSERT_EQ(Status::OK, kvs_.Put(key1, 1000));
+  ASSERT_EQ(Status::OK, kvs_.Delete(key1));
+
+  // key2 collides with key1's tombstone.
+  EXPECT_EQ(Status::ALREADY_EXISTS, kvs_.Put(key2, 999));
+
+  int value = 0;
+  EXPECT_EQ(Status::NOT_FOUND, kvs_.Get(key1, &value));
+
+  EXPECT_EQ(Status::NOT_FOUND, kvs_.Get(key2, &value));
+  EXPECT_EQ(Status::NOT_FOUND, kvs_.ValueSize(key2).status());
+  EXPECT_EQ(Status::NOT_FOUND, kvs_.Delete(key2));
+}
+
 TEST_F(EmptyInitializedKvs, Iteration_Empty_ByReference) {
   for (const KeyValueStore::Item& entry : kvs_) {
     FAIL();  // The KVS is empty; this shouldn't execute.
diff --git a/pw_kvs/public/pw_kvs/key_value_store.h b/pw_kvs/public/pw_kvs/key_value_store.h
index e2777dc..2f1c91a 100644
--- a/pw_kvs/public/pw_kvs/key_value_store.h
+++ b/pw_kvs/public/pw_kvs/key_value_store.h
@@ -121,6 +121,20 @@
     return FixedSizeGet(key, reinterpret_cast<std::byte*>(pointer), sizeof(T));
   }
 
+  // Adds a key-value entry to the KVS. If the key was already present, its
+  // value is overwritten.
+  //
+  // In the current implementation, all keys in the KVS must have a unique hash.
+  // If Put is called with a key whose hash matches an existing key, nothing
+  // is added and ALREADY_EXISTS is returned.
+  //
+  //                    OK: the entry was successfully added or updated
+  //   FAILED_PRECONDITION: the KVS is not initialized
+  //    RESOURCE_EXHAUSTED: there is not enough space to add the entry
+  //      INVALID_ARGUMENT: key is empty or too long or value is too large
+  //        ALREADY_EXISTS: the entry could not be added because a different key
+  //                        with the same hash is already in the KVS
+  //
   Status Put(std::string_view key, span<const std::byte> value);
 
   template <typename T,
@@ -278,6 +292,15 @@
         key, const_cast<const KeyDescriptor**>(result));
   }
 
+  Status FindExistingKeyDescriptor(std::string_view key,
+                                   const KeyDescriptor** result) const;
+
+  Status FindExistingKeyDescriptor(std::string_view key,
+                                   KeyDescriptor** result) {
+    return static_cast<const KeyValueStore&>(*this).FindExistingKeyDescriptor(
+        key, const_cast<const KeyDescriptor**>(result));
+  }
+
   Status ReadEntryHeader(Address address, Entry* header) const;
   Status ReadEntryKey(Address address, size_t key_length, char* key) const;