pw_kvs: Add better error handling during read, write, and init
- When reading a key entry, use all of the redundant copies before
failing.
- Add CopyEntryToSector helper that makes a copy of a key to another
sector.
- Move FindExisting method and add wrapper for Find from EntryCache in
mto KVS to allow the methods to report errors that are found.
Change-Id: Ia2da21e4ec3ad164b8620aa3949c524c0d916835
diff --git a/pw_kvs/entry_cache.cc b/pw_kvs/entry_cache.cc
index 6d80bb6..4528ab2 100644
--- a/pw_kvs/entry_cache.cc
+++ b/pw_kvs/entry_cache.cc
@@ -58,42 +58,62 @@
addresses_ = addresses_.first(1);
}
-Status EntryCache::Find(FlashPartition& partition,
- string_view key,
- EntryMetadata* metadata) const {
+StatusWithSize EntryCache::Find(FlashPartition& partition,
+ const Sectors& sectors,
+ const EntryFormats& formats,
+ string_view key,
+ EntryMetadata* metadata) const {
const uint32_t hash = internal::Hash(key);
Entry::KeyBuffer key_buffer;
+ bool error_detected = false;
for (size_t i = 0; i < descriptors_.size(); ++i) {
if (descriptors_[i].key_hash == hash) {
- TRY(Entry::ReadKey(
- partition, *first_address(i), key.size(), key_buffer.data()));
+ bool key_found = false;
+ string_view read_key;
- if (key == string_view(key_buffer.data(), key.size())) {
+ for (Address address : addresses(i)) {
+ Status read_result =
+ Entry::ReadKey(partition, address, key.size(), key_buffer.data());
+
+ read_key = string_view(key_buffer.data(), key.size());
+
+ if (read_result.ok() && hash == internal::Hash(read_key)) {
+ key_found = true;
+ break;
+ } else {
+ // A hash mismatch can be caused by reading invalid data or a key hash
+ // collision of keys with differing size. To verify the data read from
+ // flash is good, validate the entry.
+ Entry entry;
+ read_result = Entry::Read(partition, address, formats, &entry);
+ if (read_result.ok() && entry.VerifyChecksumInFlash().ok()) {
+ key_found = true;
+ break;
+ }
+
+ PW_LOG_WARN(
+ " Found corrupt entry, invalidating this copy of the key");
+ error_detected = true;
+ sectors.FromAddress(address).mark_corrupt();
+ }
+ }
+ size_t error_val = error_detected ? 1 : 0;
+
+ if (!key_found) {
+ PW_LOG_ERROR("No valid entries for key. Data has been lost!");
+ return StatusWithSize(Status::DATA_LOSS, error_val);
+ } else if (key == read_key) {
PW_LOG_DEBUG("Found match for key hash 0x%08" PRIx32, hash);
*metadata = EntryMetadata(descriptors_[i], addresses(i));
- return Status::OK;
+ return StatusWithSize(Status::OK, error_val);
} else {
PW_LOG_WARN("Found key hash collision for 0x%08" PRIx32, hash);
- return Status::ALREADY_EXISTS;
+ return StatusWithSize(Status::ALREADY_EXISTS, error_val);
}
}
}
- return Status::NOT_FOUND;
-}
-
-Status EntryCache::FindExisting(FlashPartition& partition,
- string_view key,
- EntryMetadata* metadata) const {
- Status status = Find(partition, key, metadata);
-
- // If the key's hash collides with an existing key or if the key is deleted,
- // treat it as if it is not in the KVS.
- if (status == Status::ALREADY_EXISTS ||
- (status.ok() && metadata->state() == EntryState::kDeleted)) {
- return Status::NOT_FOUND;
- }
- return status;
+ return StatusWithSize::NOT_FOUND;
}
EntryMetadata EntryCache::AddNew(const KeyDescriptor& descriptor,
diff --git a/pw_kvs/entry_cache_test.cc b/pw_kvs/entry_cache_test.cc
index 82678fb..76bf5dc 100644
--- a/pw_kvs/entry_cache_test.cc
+++ b/pw_kvs/entry_cache_test.cc
@@ -194,6 +194,8 @@
EXPECT_EQ(99u, it->first_address());
}
+constexpr size_t kSectorSize = 64;
+
constexpr auto kTheEntry = AsBytes(uint32_t(12345), // magic
uint32_t(0), // checksum
uint8_t(0), // alignment (16 B)
@@ -201,7 +203,9 @@
uint16_t(0), // value size
uint32_t(123), // transaction ID
ByteStr(kTheKey));
-constexpr std::array<byte, 16 - kTheEntry.size() % 16> kPadding1{};
+constexpr std::array<byte, kSectorSize - kTheEntry.size() % kSectorSize>
+ kPadding1{};
+constexpr size_t kSize1 = kTheEntry.size() + kPadding1.size();
constexpr char kCollision1[] = "9FDC";
constexpr char kCollision2[] = "axzzK";
@@ -214,7 +218,9 @@
uint16_t(0), // value size
uint32_t(123), // transaction ID
ByteStr(kCollision1));
-constexpr std::array<byte, 16 - kCollisionEntry.size() % 16> kPadding2{};
+constexpr std::array<byte, kSectorSize - kCollisionEntry.size() % kSectorSize>
+ kPadding2{};
+constexpr size_t kSize2 = kCollisionEntry.size() + kPadding2.size();
constexpr auto kDeletedEntry =
AsBytes(uint32_t(12345), // magic
@@ -224,29 +230,65 @@
uint16_t(0xffff), // value size (deleted)
uint32_t(123), // transaction ID
ByteStr("delorted"));
+constexpr std::array<byte, kSectorSize - kDeletedEntry.size() % kSectorSize>
+ kPadding3{};
+
+constexpr EntryFormat kFormat{.magic = uint32_t(12345), .checksum = nullptr};
class InitializedEntryCache : public EmptyEntryCache {
protected:
static_assert(Hash(kCollision1) == Hash(kCollision2));
InitializedEntryCache()
- : flash_(AsBytes(
- kTheEntry, kPadding1, kCollisionEntry, kPadding2, kDeletedEntry)),
- partition_(&flash_) {
- entries_.AddNew(kDescriptor, 0);
+ : flash_(AsBytes(kTheEntry,
+ kPadding1,
+ kTheEntry,
+ kPadding1,
+ kCollisionEntry,
+ kPadding2,
+ kDeletedEntry,
+ kPadding3)),
+ partition_(&flash_),
+ sectors_(sector_descriptors_, partition_, nullptr),
+ format_(kFormat) {
+ sectors_.Reset();
+ size_t address = 0;
+ auto entry = entries_.AddNew(kDescriptor, address);
+
+ address += kSize1;
+ entry.AddNewAddress(kSize1);
+
+ address += kSize1;
entries_.AddNew({.key_hash = Hash(kCollision1),
.transaction_id = 125,
.state = EntryState::kDeleted},
- kTheEntry.size() + kPadding1.size());
+ address);
+
+ address += kSize2;
entries_.AddNew({.key_hash = Hash("delorted"),
.transaction_id = 256,
.state = EntryState::kDeleted},
- kTheEntry.size() + kPadding1.size() +
- kCollisionEntry.size() + kPadding2.size());
+ address);
}
- FakeFlashBuffer<64, 128> flash_;
+ void CheckForCorruptSectors(SectorDescriptor* sector1 = nullptr,
+ SectorDescriptor* sector2 = nullptr) {
+ for (auto& sector : sectors_) {
+ bool expect_corrupt =
+ (§or == sector1 || §or == sector2) ? true : false;
+
+ EXPECT_EQ(expect_corrupt, sector.corrupt());
+ }
+ }
+
+ static constexpr size_t kTotalSectors = 128;
+ FakeFlashBuffer<kSectorSize, kTotalSectors> flash_;
FlashPartition partition_;
+
+ Vector<SectorDescriptor, kTotalSectors> sector_descriptors_;
+ Sectors sectors_;
+
+ EntryFormats format_;
};
TEST_F(InitializedEntryCache, EntryCounts) {
@@ -265,51 +307,80 @@
TEST_F(InitializedEntryCache, Find_PresentEntry) {
EntryMetadata metadata;
- ASSERT_EQ(Status::OK, entries_.Find(partition_, kTheKey, &metadata));
+
+ StatusWithSize result =
+ entries_.Find(partition_, sectors_, format_, kTheKey, &metadata);
+
+ ASSERT_EQ(Status::OK, result.status());
+ EXPECT_EQ(0u, result.size());
EXPECT_EQ(Hash(kTheKey), metadata.hash());
EXPECT_EQ(EntryState::kValid, metadata.state());
+ CheckForCorruptSectors();
+}
+
+TEST_F(InitializedEntryCache, Find_PresentEntryWithSingleReadError) {
+ // Inject 2 read errors so that the initial key read and the follow-up full
+ // read of the first entry fail.
+ flash_.InjectReadError(FlashError::Unconditional(Status::INTERNAL, 2));
+
+ EntryMetadata metadata;
+
+ StatusWithSize result =
+ entries_.Find(partition_, sectors_, format_, kTheKey, &metadata);
+
+ ASSERT_EQ(Status::OK, result.status());
+ EXPECT_EQ(1u, result.size());
+ EXPECT_EQ(Hash(kTheKey), metadata.hash());
+ EXPECT_EQ(EntryState::kValid, metadata.state());
+ CheckForCorruptSectors(§ors_.FromAddress(0));
+}
+
+TEST_F(InitializedEntryCache, Find_PresentEntryWithMultiReadError) {
+ flash_.InjectReadError(FlashError::Unconditional(Status::INTERNAL, 4));
+
+ EntryMetadata metadata;
+
+ StatusWithSize result =
+ entries_.Find(partition_, sectors_, format_, kTheKey, &metadata);
+
+ ASSERT_EQ(Status::DATA_LOSS, result.status());
+ EXPECT_EQ(1u, result.size());
+ CheckForCorruptSectors(§ors_.FromAddress(0),
+ §ors_.FromAddress(kSize1));
}
TEST_F(InitializedEntryCache, Find_DeletedEntry) {
EntryMetadata metadata;
- ASSERT_EQ(Status::OK, entries_.Find(partition_, "delorted", &metadata));
+
+ StatusWithSize result =
+ entries_.Find(partition_, sectors_, format_, "delorted", &metadata);
+
+ ASSERT_EQ(Status::OK, result.status());
+ EXPECT_EQ(0u, result.size());
EXPECT_EQ(Hash("delorted"), metadata.hash());
EXPECT_EQ(EntryState::kDeleted, metadata.state());
+ CheckForCorruptSectors();
}
TEST_F(InitializedEntryCache, Find_MissingEntry) {
EntryMetadata metadata;
- ASSERT_EQ(Status::NOT_FOUND, entries_.Find(partition_, "3.141", &metadata));
+
+ StatusWithSize result =
+ entries_.Find(partition_, sectors_, format_, "3.141", &metadata);
+
+ ASSERT_EQ(Status::NOT_FOUND, result.status());
+ EXPECT_EQ(0u, result.size());
+ CheckForCorruptSectors();
}
TEST_F(InitializedEntryCache, Find_Collision) {
EntryMetadata metadata;
- EXPECT_EQ(Status::ALREADY_EXISTS,
- entries_.Find(partition_, kCollision2, &metadata));
-}
-TEST_F(InitializedEntryCache, FindExisting_PresentEntry) {
- EntryMetadata metadata;
- ASSERT_EQ(Status::OK, entries_.FindExisting(partition_, kTheKey, &metadata));
- EXPECT_EQ(Hash(kTheKey), metadata.hash());
-}
-
-TEST_F(InitializedEntryCache, FindExisting_DeletedEntry) {
- EntryMetadata metadata;
- ASSERT_EQ(Status::NOT_FOUND,
- entries_.FindExisting(partition_, "delorted", &metadata));
-}
-
-TEST_F(InitializedEntryCache, FindExisting_MissingEntry) {
- EntryMetadata metadata;
- ASSERT_EQ(Status::NOT_FOUND,
- entries_.FindExisting(partition_, "3.141", &metadata));
-}
-
-TEST_F(InitializedEntryCache, FindExisting_Collision) {
- EntryMetadata metadata;
- EXPECT_EQ(Status::NOT_FOUND,
- entries_.FindExisting(partition_, kCollision2, &metadata));
+ StatusWithSize result =
+ entries_.Find(partition_, sectors_, format_, kCollision2, &metadata);
+ EXPECT_EQ(Status::ALREADY_EXISTS, result.status());
+ EXPECT_EQ(0u, result.size());
+ CheckForCorruptSectors();
}
} // namespace
diff --git a/pw_kvs/key_value_store.cc b/pw_kvs/key_value_store.cc
index 9fa7199..2bc6ddf 100644
--- a/pw_kvs/key_value_store.cc
+++ b/pw_kvs/key_value_store.cc
@@ -56,7 +56,6 @@
Status KeyValueStore::Init() {
initialized_ = InitializationState::kNotInitialized;
error_detected_ = false;
- error_stats_ = {};
last_transaction_id_ = 0;
sectors_.Reset();
entry_cache_.Reset();
@@ -174,17 +173,39 @@
DBG("Second pass: Count valid bytes in each sector");
Address newest_key = 0;
- // For every valid entry, count the valid bytes in that sector. Track which
- // entry has the newest transaction ID for initializing last_new_sector_.
- for (const EntryMetadata& metadata : entry_cache_) {
+ // For every valid entry, for each address, count the valid bytes in that
+ // sector. If the address fails to read, remove the address and mark the
+ // sector as corrupt. Track which entry has the newest transaction ID for
+ // initializing last_new_sector_.
+ for (EntryMetadata& metadata : entry_cache_) {
if (metadata.addresses().size() < redundancy()) {
error_detected_ = true;
}
- for (Address address : metadata.addresses()) {
+ size_t index = 0;
+ while (index < metadata.addresses().size()) {
+ Address address = metadata.addresses()[index];
Entry entry;
- TRY(Entry::Read(partition_, address, formats_, &entry));
- sectors_.FromAddress(address).AddValidBytes(entry.size());
+
+ Status read_result = Entry::Read(partition_, address, formats_, &entry);
+
+ SectorDescriptor& sector = sectors_.FromAddress(address);
+
+ if (read_result.ok()) {
+ sector.AddValidBytes(entry.size());
+ index++;
+ } else {
+ corrupt_entries++;
+ total_corrupt_bytes += sector.writable_bytes();
+ error_detected_ = true;
+ sector.mark_corrupt();
+
+ // Remove the bad address and stay at this index. The removal
+ // replaces out the removed address with the back address so
+ // this index needs to be rechecked with the new address.
+ metadata.RemoveAddress(address);
+ }
}
+
if (metadata.IsNewerThan(last_transaction_id_)) {
last_transaction_id_ = metadata.transaction_id();
newest_key = metadata.addresses().back();
@@ -201,14 +222,15 @@
initialized_ = InitializationState::kReady;
} else {
if (options_.recovery != ErrorRecovery::kManual) {
- WRN("KVS init: Corruption detected, beginning repair");
+ WRN("KVS init: Corruption detected, beginning repair. Found %zu corrupt "
+ "bytes and %d corrupt entries.",
+ total_corrupt_bytes,
+ corrupt_entries);
Status recovery_status = Repair();
if (recovery_status.ok()) {
WRN("KVS init: Corruption detected and fully repaired");
initialized_ = InitializationState::kReady;
- total_corrupt_bytes = 0;
- corrupt_entries = 0;
} else if (recovery_status == Status::RESOURCE_EXHAUSTED) {
WRN("KVS init: Unable to maintain required free sector");
initialized_ = InitializationState::kNeedsMaintenance;
@@ -230,11 +252,9 @@
partition_.sector_size_bytes());
// Report any corruption was not repaired.
- if (total_corrupt_bytes > 0) {
- WRN("Found %zu corrupt bytes and %d corrupt entries during init process; "
- "some keys may be missing",
- total_corrupt_bytes,
- corrupt_entries);
+ if (error_detected_) {
+ WRN("KVS init: Corruption found but not repaired, KVS unavailable until "
+ "successful maintenance.");
return Status::DATA_LOSS;
}
@@ -269,12 +289,14 @@
return stats;
}
+// Check KVS for any error conditions. Primarily intended for test and
+// internal use.
bool KeyValueStore::CheckForErrors() {
// Check for corrupted sectors
for (SectorDescriptor& sector : sectors_) {
if (sector.corrupt()) {
error_detected_ = true;
- break;
+ return error_detected();
}
}
@@ -283,7 +305,7 @@
for (const EntryMetadata& metadata : entry_cache_) {
if (metadata.addresses().size() < redundancy()) {
error_detected_ = true;
- break;
+ return error_detected();
}
}
}
@@ -343,7 +365,7 @@
TRY_WITH_SIZE(CheckReadOperation(key));
EntryMetadata metadata;
- TRY_WITH_SIZE(entry_cache_.FindExisting(partition_, key, &metadata));
+ TRY_WITH_SIZE(FindExisting(key, &metadata));
return Get(key, metadata, value_buffer, offset_bytes);
}
@@ -362,7 +384,7 @@
}
EntryMetadata metadata;
- Status status = entry_cache_.Find(partition_, key, &metadata);
+ Status status = FindEntry(key, &metadata);
if (status.ok()) {
// TODO: figure out logging how to support multiple addresses.
@@ -384,7 +406,7 @@
TRY(CheckWriteOperation(key));
EntryMetadata metadata;
- TRY(entry_cache_.FindExisting(partition_, key, &metadata));
+ TRY(FindExisting(key, &metadata));
// TODO: figure out logging how to support multiple addresses.
DBG("Writing tombstone for key 0x%08" PRIx32 " in %zu sectors including %u",
@@ -398,11 +420,7 @@
key_buffer_.fill('\0');
Entry entry;
- // TODO: add support for using one of the redundant entries if reading the
- // first copy fails.
- if (Entry::Read(
- kvs_.partition_, iterator_->first_address(), kvs_.formats_, &entry)
- .ok()) {
+ if (kvs_.ReadEntry(*iterator_, entry).ok()) {
entry.ReadKey(key_buffer_);
}
}
@@ -429,20 +447,61 @@
TRY_WITH_SIZE(CheckReadOperation(key));
EntryMetadata metadata;
- TRY_WITH_SIZE(entry_cache_.FindExisting(partition_, key, &metadata));
+ TRY_WITH_SIZE(FindExisting(key, &metadata));
return ValueSize(metadata);
}
+Status KeyValueStore::ReadEntry(const EntryMetadata& metadata,
+ Entry& entry) const {
+ // Try to read an entry
+ Status read_result = Status::DATA_LOSS;
+ for (Address address : metadata.addresses()) {
+ read_result = Entry::Read(partition_, address, formats_, &entry);
+ if (read_result.ok()) {
+ return read_result;
+ }
+
+ // Found a bad address. Set the sector as corrupt.
+ error_detected_ = true;
+ sectors_.FromAddress(address).mark_corrupt();
+ }
+
+ ERR("No valid entries for key. Data has been lost!");
+ return read_result;
+}
+
+Status KeyValueStore::FindEntry(string_view key,
+ EntryMetadata* found_entry) const {
+ StatusWithSize find_result =
+ entry_cache_.Find(partition_, sectors_, formats_, key, found_entry);
+
+ if (find_result.size() > 0u) {
+ error_detected_ = true;
+ }
+ return find_result.status();
+}
+
+Status KeyValueStore::FindExisting(string_view key,
+ EntryMetadata* metadata) const {
+ Status status = FindEntry(key, metadata);
+
+ // If the key's hash collides with an existing key or if the key is deleted,
+ // treat it as if it is not in the KVS.
+ if (status == Status::ALREADY_EXISTS ||
+ (status.ok() && metadata->state() == EntryState::kDeleted)) {
+ return Status::NOT_FOUND;
+ }
+ return status;
+}
+
StatusWithSize KeyValueStore::Get(string_view key,
const EntryMetadata& metadata,
span<std::byte> value_buffer,
size_t offset_bytes) const {
Entry entry;
- // TODO: add support for using one of the redundant entries if reading the
- // first copy fails.
- TRY_WITH_SIZE(
- Entry::Read(partition_, metadata.first_address(), formats_, &entry));
+
+ TRY_WITH_SIZE(ReadEntry(metadata, entry));
StatusWithSize result = entry.ReadValue(value_buffer, offset_bytes);
if (result.ok() && options_.verify_on_read && offset_bytes == 0u) {
@@ -464,7 +523,7 @@
TRY(CheckWriteOperation(key));
EntryMetadata metadata;
- TRY(entry_cache_.FindExisting(partition_, key, &metadata));
+ TRY(FindExisting(key, &metadata));
return FixedSizeGet(key, metadata, value, size_bytes);
}
@@ -490,10 +549,7 @@
StatusWithSize KeyValueStore::ValueSize(const EntryMetadata& metadata) const {
Entry entry;
- // TODO: add support for using one of the redundant entries if reading the
- // first copy fails.
- TRY_WITH_SIZE(
- Entry::Read(partition_, metadata.first_address(), formats_, &entry));
+ TRY_WITH_SIZE(ReadEntry(metadata, entry));
return StatusWithSize(entry.value_size());
}
@@ -529,9 +585,7 @@
span<const byte> value) {
// Read the original entry to get the size for sector accounting purposes.
Entry entry;
- // TODO: add support for using one of the redundant entries if reading the
- // first copy fails.
- TRY(Entry::Read(partition_, metadata.first_address(), formats_, &entry));
+ TRY(ReadEntry(metadata, entry));
return WriteEntry(key, value, new_state, &metadata, entry.size());
}
@@ -668,10 +722,7 @@
span<const byte> value) {
const StatusWithSize result = entry.Write(key, value);
- // Remove any bytes that were written, even if the write was not successful.
- // This is important to retain the writable space invariant on the sectors.
SectorDescriptor& sector = sectors_.FromAddress(entry.address());
- sector.RemoveWritableBytes(result.size());
if (!result.ok()) {
ERR("Failed to write %zu bytes at %#zx. %zu actually written",
@@ -685,15 +736,36 @@
TRY(MarkSectorCorruptIfNotOk(entry.VerifyChecksumInFlash(), §or));
}
+ sector.RemoveWritableBytes(result.size());
sector.AddValidBytes(result.size());
return Status::OK;
}
+StatusWithSize KeyValueStore::CopyEntryToSector(Entry& entry,
+ SectorDescriptor* new_sector,
+ Address& new_address) {
+ new_address = sectors_.NextWritableAddress(*new_sector);
+ const StatusWithSize result = entry.Copy(new_address);
+
+ TRY_WITH_SIZE(MarkSectorCorruptIfNotOk(result.status(), new_sector));
+
+ if (options_.verify_on_write) {
+ TRY_WITH_SIZE(
+ MarkSectorCorruptIfNotOk(entry.VerifyChecksumInFlash(), new_sector));
+ }
+ // Entry was written successfully; update descriptor's address and the sector
+ // descriptors to reflect the new entry.
+ new_sector->RemoveWritableBytes(result.size());
+ new_sector->AddValidBytes(result.size());
+
+ return result;
+}
+
Status KeyValueStore::RelocateEntry(const EntryMetadata& metadata,
KeyValueStore::Address& address,
span<const Address> reserved_addresses) {
Entry entry;
- TRY(Entry::Read(partition_, address, formats_, &entry));
+ TRY(ReadEntry(metadata, entry));
// Find a new sector for the entry and write it to the new location. For
// relocation the find should not not be a sector already containing the key
@@ -706,19 +778,10 @@
TRY(sectors_.FindSpaceDuringGarbageCollection(
&new_sector, entry.size(), metadata.addresses(), reserved_addresses));
- const Address new_address = sectors_.NextWritableAddress(*new_sector);
- const StatusWithSize result = entry.Copy(new_address);
-
- TRY(MarkSectorCorruptIfNotOk(result.status(), new_sector));
-
- if (options_.verify_on_write) {
- TRY(MarkSectorCorruptIfNotOk(entry.VerifyChecksumInFlash(), new_sector));
- }
- // Entry was written successfully; update descriptor's address and the sector
- // descriptors to reflect the new entry.
- new_sector->RemoveWritableBytes(result.size());
- new_sector->AddValidBytes(result.size());
- sectors_.FromAddress(address).RemoveValidBytes(result.size());
+ Address new_address;
+ TRY_ASSIGN(const size_t result_size,
+ CopyEntryToSector(entry, new_sector, new_address));
+ sectors_.FromAddress(address).RemoveValidBytes(result_size);
address = new_address;
return Status::OK;
@@ -730,6 +793,7 @@
}
DBG("Do full maintenance");
+ CheckForErrors();
if (error_detected_) {
TRY(Repair());
@@ -784,7 +848,7 @@
Status KeyValueStore::RelocateKeyAddressesInSector(
SectorDescriptor& sector_to_gc,
- const EntryMetadata& metadata,
+ EntryMetadata& metadata,
span<const Address> reserved_addresses) {
for (FlashPartition::Address& address : metadata.addresses()) {
if (sectors_.AddressInSector(sector_to_gc, address)) {
@@ -803,7 +867,7 @@
DBG(" Garbage Collect sector %u", sectors_.Index(sector_to_gc));
// Step 1: Move any valid entries in the GC sector to other sectors
if (sector_to_gc.valid_bytes() != 0) {
- for (const EntryMetadata& metadata : entry_cache_) {
+ for (EntryMetadata& metadata : entry_cache_) {
TRY(RelocateKeyAddressesInSector(
sector_to_gc, metadata, reserved_addresses));
}
@@ -831,10 +895,7 @@
Entry entry;
- // For simplicity use just the first copy. Any known bad copies should have
- // been removed already.
- // TODO: Add support to read other copies if needed.
- TRY(Entry::Read(partition_, metadata.first_address(), formats_, &entry));
+ TRY(ReadEntry(metadata, entry));
TRY(entry.VerifyChecksumInFlash());
for (size_t i = metadata.addresses().size();
@@ -842,17 +903,8 @@
i++) {
TRY(sectors_.FindSpace(&new_sector, entry.size(), metadata.addresses()));
- const Address new_address = sectors_.NextWritableAddress(*new_sector);
- const StatusWithSize result = entry.Copy(new_address);
- TRY(MarkSectorCorruptIfNotOk(result.status(), new_sector));
-
- if (options_.verify_on_write) {
- TRY(MarkSectorCorruptIfNotOk(entry.VerifyChecksumInFlash(), new_sector));
- }
- // Entry was written successfully; update descriptor's address and the
- // sector descriptors to reflect the new entry.
- new_sector->RemoveWritableBytes(result.size());
- new_sector->AddValidBytes(result.size());
+ Address new_address;
+ TRY(CopyEntryToSector(entry, new_sector, new_address));
metadata.AddNewAddress(new_address);
}
@@ -922,14 +974,14 @@
Status repair_status = Status::OK;
if (redundancy() == 1) {
- DBG(" Redundancy not in use, nothing to check");
+ DBG(" Redundancy not in use, nothting to check");
return Status::OK;
}
- DBG(" Write any needed additional duplicate copies of key to fulfill %u "
- "redundancy",
+ DBG(" Write any needed additional duplicate copies of keys to fulfill %u"
+ " redundancy",
unsigned(redundancy()));
- for (const EntryMetadata& metadata : entry_cache_) {
+ for (EntryMetadata& metadata : entry_cache_) {
if (metadata.addresses().size() >= redundancy()) {
continue;
}
@@ -937,10 +989,7 @@
DBG(" Key with %u of %u copies found, adding missing copies",
unsigned(metadata.addresses().size()),
unsigned(redundancy()));
- // TODO: Add non-const iterator to the entry_cache_ and remove this
- // const_cast.
- Status fill_status =
- AddRedundantEntries(const_cast<EntryMetadata&>(metadata));
+ Status fill_status = AddRedundantEntries(metadata);
if (fill_status.ok()) {
error_stats_.missing_redundant_entries_recovered += 1;
DBG(" Key missing copies added");
diff --git a/pw_kvs/key_value_store_binary_format_test.cc b/pw_kvs/key_value_store_binary_format_test.cc
index 8042712..3119edc 100644
--- a/pw_kvs/key_value_store_binary_format_test.cc
+++ b/pw_kvs/key_value_store_binary_format_test.cc
@@ -279,6 +279,9 @@
EXPECT_EQ(32u, kvs_.GetStorageStats().in_use_bytes);
}
+// The Put_WriteFailure_EntryNotAddedButBytesMarkedWritten test is run with both
+// the KvsErrorRecovery and KvsErrorHandling test fixtures (different KVS
+// configurations).
TEST_F(KvsErrorHandling, Put_WriteFailure_EntryNotAddedButBytesMarkedWritten) {
ASSERT_EQ(Status::OK, kvs_.Init());
flash_.InjectWriteError(FlashError::Unconditional(Status::UNAVAILABLE, 1));
@@ -359,7 +362,7 @@
ASSERT_EQ(32u, stats.in_use_bytes);
// The sector with corruption should have been recovered.
ASSERT_EQ(0u, stats.reclaimable_bytes);
- ASSERT_EQ(1u, stats.corrupt_sectors_recovered);
+ ASSERT_EQ(i + 1u, stats.corrupt_sectors_recovered);
}
}
@@ -467,6 +470,9 @@
EXPECT_EQ(32u, kvs_.GetStorageStats().in_use_bytes);
}
+// The Put_WriteFailure_EntryNotAddedButBytesMarkedWritten test is run with both
+// the KvsErrorRecovery and KvsErrorHandling test fixtures (different KVS
+// configurations).
TEST_F(KvsErrorRecovery, Put_WriteFailure_EntryNotAddedButBytesMarkedWritten) {
ASSERT_EQ(Status::OK, kvs_.Init());
flash_.InjectWriteError(FlashError::Unconditional(Status::UNAVAILABLE, 1));
@@ -561,6 +567,145 @@
ASSERT_CONTAINS_ENTRY("kee", "O_o");
}
+TEST_F(InitializedMultiMagicKvs, RecoversLossOfFirstSector) {
+ auto stats = kvs_.GetStorageStats();
+ EXPECT_EQ(stats.in_use_bytes, (160u * kvs_.redundancy()));
+ EXPECT_EQ(stats.reclaimable_bytes, 0u);
+ EXPECT_EQ(stats.writable_bytes, 512u * 3 - (160 * kvs_.redundancy()));
+ EXPECT_EQ(stats.corrupt_sectors_recovered, 0u);
+ EXPECT_EQ(stats.missing_redundant_entries_recovered, 5u);
+
+ EXPECT_EQ(Status::OK, partition_.Erase(0, 1));
+
+ ASSERT_CONTAINS_ENTRY("key1", "value1");
+ ASSERT_CONTAINS_ENTRY("k2", "value2");
+ ASSERT_CONTAINS_ENTRY("k3y", "value3");
+ ASSERT_CONTAINS_ENTRY("A Key", "XD");
+ ASSERT_CONTAINS_ENTRY("kee", "O_o");
+
+ EXPECT_EQ(true, kvs_.error_detected());
+
+ stats = kvs_.GetStorageStats();
+ EXPECT_EQ(stats.in_use_bytes, (160u * kvs_.redundancy()));
+ EXPECT_EQ(stats.reclaimable_bytes, 352u);
+ EXPECT_EQ(stats.writable_bytes, 512u * 2 - (160 * (kvs_.redundancy() - 1)));
+ EXPECT_EQ(stats.corrupt_sectors_recovered, 0u);
+ EXPECT_EQ(stats.missing_redundant_entries_recovered, 5u);
+
+ EXPECT_EQ(Status::OK, kvs_.FullMaintenance());
+ stats = kvs_.GetStorageStats();
+ EXPECT_EQ(stats.in_use_bytes, (160u * kvs_.redundancy()));
+ EXPECT_EQ(stats.reclaimable_bytes, 0u);
+ EXPECT_EQ(stats.writable_bytes, 512u * 3 - (160 * kvs_.redundancy()));
+ EXPECT_EQ(stats.corrupt_sectors_recovered, 1u);
+ EXPECT_EQ(stats.missing_redundant_entries_recovered, 5u);
+}
+
+TEST_F(InitializedMultiMagicKvs, RecoversLossOfSecondSector) {
+ auto stats = kvs_.GetStorageStats();
+ EXPECT_EQ(stats.in_use_bytes, (160u * kvs_.redundancy()));
+ EXPECT_EQ(stats.reclaimable_bytes, 0u);
+ EXPECT_EQ(stats.writable_bytes, 512u * 3 - (160 * kvs_.redundancy()));
+ EXPECT_EQ(stats.corrupt_sectors_recovered, 0u);
+ EXPECT_EQ(stats.missing_redundant_entries_recovered, 5u);
+
+ EXPECT_EQ(Status::OK, partition_.Erase(partition_.sector_size_bytes(), 1));
+
+ ASSERT_CONTAINS_ENTRY("key1", "value1");
+ ASSERT_CONTAINS_ENTRY("k2", "value2");
+ ASSERT_CONTAINS_ENTRY("k3y", "value3");
+ ASSERT_CONTAINS_ENTRY("A Key", "XD");
+ ASSERT_CONTAINS_ENTRY("kee", "O_o");
+
+ EXPECT_EQ(false, kvs_.error_detected());
+
+ EXPECT_EQ(false, kvs_.Init());
+ stats = kvs_.GetStorageStats();
+ EXPECT_EQ(stats.in_use_bytes, (160u * kvs_.redundancy()));
+ EXPECT_EQ(stats.reclaimable_bytes, 0u);
+ EXPECT_EQ(stats.writable_bytes, 512u * 3 - (160 * kvs_.redundancy()));
+ EXPECT_EQ(stats.corrupt_sectors_recovered, 0u);
+ EXPECT_EQ(stats.missing_redundant_entries_recovered, 10u);
+}
+
+TEST_F(InitializedMultiMagicKvs, SingleReadErrors) {
+ // Inject 2 read errors, so the first read attempt fully fails.
+ flash_.InjectReadError(FlashError::Unconditional(Status::INTERNAL, 2));
+
+ flash_.InjectReadError(FlashError::Unconditional(Status::INTERNAL, 1, 7));
+
+ ASSERT_CONTAINS_ENTRY("key1", "value1");
+ ASSERT_CONTAINS_ENTRY("k2", "value2");
+ ASSERT_CONTAINS_ENTRY("k3y", "value3");
+ ASSERT_CONTAINS_ENTRY("A Key", "XD");
+ ASSERT_CONTAINS_ENTRY("kee", "O_o");
+
+ EXPECT_EQ(true, kvs_.error_detected());
+
+ auto stats = kvs_.GetStorageStats();
+ EXPECT_EQ(stats.in_use_bytes, (160u * kvs_.redundancy()));
+ EXPECT_EQ(stats.reclaimable_bytes, 352u);
+ EXPECT_EQ(stats.writable_bytes, 512u * 2 - (160 * (kvs_.redundancy() - 1)));
+ EXPECT_EQ(stats.corrupt_sectors_recovered, 0u);
+ EXPECT_EQ(stats.missing_redundant_entries_recovered, 5u);
+}
+
+TEST_F(InitializedMultiMagicKvs, SingleWriteError) {
+ flash_.InjectWriteError(FlashError::Unconditional(Status::INTERNAL, 1, 1));
+
+ EXPECT_EQ(Status::INTERNAL, kvs_.Put("new key", ByteStr("abcd?")));
+
+ EXPECT_EQ(true, kvs_.error_detected());
+
+ auto stats = kvs_.GetStorageStats();
+ EXPECT_EQ(stats.in_use_bytes, 32 + (160u * kvs_.redundancy()));
+ EXPECT_EQ(stats.reclaimable_bytes, 352u);
+ EXPECT_EQ(stats.writable_bytes,
+ 512u * 2 - 32 - (160 * (kvs_.redundancy() - 1)));
+ EXPECT_EQ(stats.corrupt_sectors_recovered, 0u);
+ EXPECT_EQ(stats.missing_redundant_entries_recovered, 5u);
+
+ char val[20] = {};
+ EXPECT_EQ(Status::OK,
+ kvs_.Get("new key", as_writable_bytes(span(val))).status());
+
+ EXPECT_EQ(Status::OK, kvs_.FullMaintenance());
+ stats = kvs_.GetStorageStats();
+ EXPECT_EQ(stats.in_use_bytes, (192u * kvs_.redundancy()));
+ EXPECT_EQ(stats.reclaimable_bytes, 0u);
+ EXPECT_EQ(stats.writable_bytes, 512u * 3 - (192 * kvs_.redundancy()));
+ EXPECT_EQ(stats.corrupt_sectors_recovered, 1u);
+ EXPECT_EQ(stats.missing_redundant_entries_recovered, 6u);
+
+ EXPECT_EQ(Status::OK,
+ kvs_.Get("new key", as_writable_bytes(span(val))).status());
+}
+
+TEST_F(InitializedMultiMagicKvs, DataLossAfterLosingBothCopies) {
+ EXPECT_EQ(Status::OK, partition_.Erase(0, 2));
+
+ char val[20] = {};
+ EXPECT_EQ(Status::DATA_LOSS,
+ kvs_.Get("key1", as_writable_bytes(span(val))).status());
+ EXPECT_EQ(Status::DATA_LOSS,
+ kvs_.Get("k2", as_writable_bytes(span(val))).status());
+ EXPECT_EQ(Status::DATA_LOSS,
+ kvs_.Get("k3y", as_writable_bytes(span(val))).status());
+ EXPECT_EQ(Status::DATA_LOSS,
+ kvs_.Get("A Key", as_writable_bytes(span(val))).status());
+ EXPECT_EQ(Status::DATA_LOSS,
+ kvs_.Get("kee", as_writable_bytes(span(val))).status());
+
+ EXPECT_EQ(true, kvs_.error_detected());
+
+ auto stats = kvs_.GetStorageStats();
+ EXPECT_EQ(stats.in_use_bytes, (160u * kvs_.redundancy()));
+ EXPECT_EQ(stats.reclaimable_bytes, 2 * 352u);
+ EXPECT_EQ(stats.writable_bytes, 512u);
+ EXPECT_EQ(stats.corrupt_sectors_recovered, 0u);
+ EXPECT_EQ(stats.missing_redundant_entries_recovered, 5u);
+}
+
TEST_F(InitializedMultiMagicKvs, PutNewEntry_UsesFirstFormat) {
EXPECT_EQ(Status::OK, kvs_.Put("new key", ByteStr("abcd?")));
diff --git a/pw_kvs/public/pw_kvs/internal/entry_cache.h b/pw_kvs/public/pw_kvs/internal/entry_cache.h
index 1cbb47f..5ed2f15 100644
--- a/pw_kvs/public/pw_kvs/internal/entry_cache.h
+++ b/pw_kvs/public/pw_kvs/internal/entry_cache.h
@@ -20,7 +20,9 @@
#include "pw_containers/vector.h"
#include "pw_kvs/flash_memory.h"
+#include "pw_kvs/format.h"
#include "pw_kvs/internal/key_descriptor.h"
+#include "pw_kvs/internal/sectors.h"
#include "pw_span/span.h"
namespace pw::kvs::internal {
@@ -161,17 +163,9 @@
// key's hash collides with the hash for an existing
// descriptor
//
- Status Find(FlashPartition& partition,
- std::string_view key,
- EntryMetadata* metadata) const;
-
- // Searches for a KeyDescriptor that matches this key and sets *metadata to
- // point to it if one is found.
- //
- // OK: there is a matching descriptor and *metadata is set
- // NOT_FOUND: there is no descriptor that matches this key
- //
- Status FindExisting(FlashPartition& partition,
+ StatusWithSize Find(FlashPartition& partition,
+ const Sectors& sectors,
+ const EntryFormats& formats,
std::string_view key,
EntryMetadata* metadata) const;
diff --git a/pw_kvs/public/pw_kvs/internal/sectors.h b/pw_kvs/public/pw_kvs/internal/sectors.h
index e871f0b..20cd61f 100644
--- a/pw_kvs/public/pw_kvs/internal/sectors.h
+++ b/pw_kvs/public/pw_kvs/internal/sectors.h
@@ -143,15 +143,9 @@
return Index(sector) * partition_.sector_size_bytes();
}
- // Returns the sector that contains this address. The address must be valid.
- SectorDescriptor& FromAddress(Address address) {
- const size_t index = address / partition_.sector_size_bytes();
+ SectorDescriptor& FromAddress(Address address) const {
// TODO: Add boundary checking once asserts are supported.
// DCHECK_LT(index, sector_map_size_);`
- return descriptors_[index];
- }
-
- const SectorDescriptor& FromAddress(Address address) const {
return descriptors_[address / partition_.sector_size_bytes()];
}
diff --git a/pw_kvs/public/pw_kvs/key_value_store.h b/pw_kvs/public/pw_kvs/key_value_store.h
index 40d31a3..1fa5297 100644
--- a/pw_kvs/public/pw_kvs/key_value_store.h
+++ b/pw_kvs/public/pw_kvs/key_value_store.h
@@ -188,7 +188,10 @@
// recovery, will do any needed repairing of corruption. Does garbage
// collection of part of the KVS, typically a single sector or similar unit
// that makes sense for the KVS implementation.
- Status PartialMaintenance() { return GarbageCollect(span<const Address>()); }
+ Status PartialMaintenance() {
+ CheckForErrors();
+ return GarbageCollect(span<const Address>());
+ }
void LogDebugInfo() const;
@@ -301,6 +304,9 @@
size_t redundancy() const { return entry_cache_.redundancy(); }
bool error_detected() const { return error_detected_; }
+
+ // Check KVS for any error conditions. Primarily intended for test and
+ // internal use.
bool CheckForErrors();
protected:
@@ -342,6 +348,30 @@
StatusWithSize ValueSize(const EntryMetadata& metadata) const;
+ Status ReadEntry(const EntryMetadata& metadata, Entry& entry) const;
+
+ // Finds the metadata for an entry matching a particular key. Searches for a
+ // KeyDescriptor that matches this key and sets *metadata to point to it if
+ // one is found.
+ //
+ // OK: there is a matching descriptor and *metadata is set
+ // NOT_FOUND: there is no descriptor that matches this key, but this key
+ // has a unique hash (and could potentially be added to the
+ // KVS)
+ // ALREADY_EXISTS: there is no descriptor that matches this key, but the
+ // key's hash collides with the hash for an existing
+ // descriptor
+ //
+ Status FindEntry(std::string_view key, EntryMetadata* metadata) const;
+
+ // Searches for a KeyDescriptor that matches this key and sets *metadata to
+ // point to it if one is found.
+ //
+ // OK: there is a matching descriptor and *metadata is set
+ // NOT_FOUND: there is no descriptor that matches this key
+ //
+ Status FindExisting(std::string_view key, EntryMetadata* metadata) const;
+
StatusWithSize Get(std::string_view key,
const EntryMetadata& metadata,
span<std::byte> value_buffer,
@@ -387,6 +417,10 @@
std::string_view key,
span<const std::byte> value);
+ StatusWithSize CopyEntryToSector(Entry& entry,
+ SectorDescriptor* new_sector,
+ Address& new_address);
+
Status RelocateEntry(const EntryMetadata& metadata,
KeyValueStore::Address& address,
span<const Address> addresses_to_skip);
@@ -396,7 +430,7 @@
Status GarbageCollect(span<const Address> addresses_to_skip);
Status RelocateKeyAddressesInSector(SectorDescriptor& sector_to_gc,
- const EntryMetadata& descriptor,
+ EntryMetadata& descriptor,
span<const Address> addresses_to_skip);
Status GarbageCollectSector(SectorDescriptor& sector_to_gc,
@@ -445,7 +479,9 @@
};
InitializationState initialized_;
- bool error_detected_;
+ // error_detected_ needs to be set from const KVS methods (such as Get), so
+ // make it mutable.
+ mutable bool error_detected_;
struct ErrorStats {
size_t corrupt_sectors_recovered;