pw_kvs: Add KVS error repair/recovery
- Add repair of detected errors in the KVS.
- Add additional error detection in KVS init
- KVS initialization has a new "needs maintenance" state. This is a
read-only mode because not all of the required KVS system invariants
have been met.
Change-Id: If13094855d8da03d59011891c8ed5af2ad38ad2d
diff --git a/pw_kvs/entry_cache.cc b/pw_kvs/entry_cache.cc
index 447417d..1f4dfa8 100644
--- a/pw_kvs/entry_cache.cc
+++ b/pw_kvs/entry_cache.cc
@@ -31,6 +31,23 @@
} // namespace
+void EntryMetadata::RemoveAddress(Address address_to_remove) {
+ // Find the index of the address to remove.
+ for (Address& address : addresses_) {
+ if (address == address_to_remove) {
+ // Move the address at the back of the list to the slot of the address
+ // being removed. Do this unconditionally, even if the address to remove
+ // is the last slot since the logic still works.
+ address = addresses_.back();
+
+ // Remove the back entry of the address list.
+ addresses_.back() = kNoAddress;
+ addresses_ = span(addresses_.begin(), addresses_.size() - 1);
+ break;
+ }
+ }
+}
+
void EntryMetadata::Reset(const KeyDescriptor& descriptor, Address address) {
*descriptor_ = descriptor;
diff --git a/pw_kvs/key_value_store.cc b/pw_kvs/key_value_store.cc
index d0ab512..9fa7199 100644
--- a/pw_kvs/key_value_store.cc
+++ b/pw_kvs/key_value_store.cc
@@ -48,13 +48,15 @@
sectors_(sector_descriptor_list, *partition, temp_sectors_to_skip),
entry_cache_(key_descriptor_list, addresses, redundancy),
options_(options),
- initialized_(false),
+ initialized_(InitializationState::kNotInitialized),
error_detected_(false),
+ error_stats_({}),
last_transaction_id_(0) {}
Status KeyValueStore::Init() {
- initialized_ = false;
+ initialized_ = InitializationState::kNotInitialized;
error_detected_ = false;
+ error_stats_ = {};
last_transaction_id_ = 0;
sectors_.Reset();
entry_cache_.Reset();
@@ -191,26 +193,34 @@
sectors_.set_last_new_sector(newest_key);
- if (error_detected_) {
- Status recovery_status = Repair();
- if (recovery_status.ok()) {
- INF("KVS init: Corruption detected and fully repaired");
- } else {
- ERR("KVS init: Corruption detected and unable repair");
- }
- }
-
if (!empty_sector_found) {
- // TODO: Record/report the error condition and recovery result.
- Status gc_result = GarbageCollectPartial();
-
- if (!gc_result.ok()) {
- ERR("KVS init failed: Unable to maintain required free sector");
- return Status::INTERNAL;
- }
+ error_detected_ = true;
}
- initialized_ = true;
+ if (!error_detected_) {
+ initialized_ = InitializationState::kReady;
+ } else {
+ if (options_.recovery != ErrorRecovery::kManual) {
+ WRN("KVS init: Corruption detected, beginning repair");
+ Status recovery_status = Repair();
+
+ if (recovery_status.ok()) {
+ WRN("KVS init: Corruption detected and fully repaired");
+ initialized_ = InitializationState::kReady;
+ total_corrupt_bytes = 0;
+ corrupt_entries = 0;
+ } else if (recovery_status == Status::RESOURCE_EXHAUSTED) {
+ WRN("KVS init: Unable to maintain required free sector");
+ initialized_ = InitializationState::kNeedsMaintenance;
+ } else {
+ WRN("KVS init: Corruption detected and unable repair");
+ initialized_ = InitializationState::kNeedsMaintenance;
+ }
+ } else {
+ WRN("KVS init: Corruption detected, no repair attempted due to options");
+ initialized_ = InitializationState::kNeedsMaintenance;
+ }
+ }
INF("KeyValueStore init complete: active keys %zu, deleted keys %zu, sectors "
"%zu, logical sector size %zu bytes",
@@ -219,6 +229,7 @@
sectors_.size(),
partition_.sector_size_bytes());
+ // Report any corruption was not repaired.
if (total_corrupt_bytes > 0) {
WRN("Found %zu corrupt bytes and %d corrupt entries during init process; "
"some keys may be missing",
@@ -231,9 +242,12 @@
}
KeyValueStore::StorageStats KeyValueStore::GetStorageStats() const {
- StorageStats stats{0, 0, 0};
+ StorageStats stats{};
const size_t sector_size = partition_.sector_size_bytes();
bool found_empty_sector = false;
+ stats.corrupt_sectors_recovered = error_stats_.corrupt_sectors_recovered;
+ stats.missing_redundant_entries_recovered =
+ error_stats_.missing_redundant_entries_recovered;
for (const SectorDescriptor& sector : sectors_) {
stats.in_use_bytes += sector.valid_bytes();
@@ -255,6 +269,28 @@
return stats;
}
+bool KeyValueStore::CheckForErrors() {
+ // Check for corrupted sectors
+ for (SectorDescriptor& sector : sectors_) {
+ if (sector.corrupt()) {
+ error_detected_ = true;
+ break;
+ }
+ }
+
+ // Check for missing redundancy.
+ if (redundancy() > 1) {
+ for (const EntryMetadata& metadata : entry_cache_) {
+ if (metadata.addresses().size() < redundancy()) {
+ error_detected_ = true;
+ break;
+ }
+ }
+ }
+
+ return error_detected();
+}
+
Status KeyValueStore::LoadEntry(Address entry_address,
Address* next_entry_address) {
Entry entry;
@@ -304,7 +340,7 @@
StatusWithSize KeyValueStore::Get(string_view key,
span<byte> value_buffer,
size_t offset_bytes) const {
- TRY_WITH_SIZE(CheckOperation(key));
+ TRY_WITH_SIZE(CheckReadOperation(key));
EntryMetadata metadata;
TRY_WITH_SIZE(entry_cache_.FindExisting(partition_, key, &metadata));
@@ -313,12 +349,11 @@
}
Status KeyValueStore::PutBytes(string_view key, span<const byte> value) {
+ TRY(CheckWriteOperation(key));
DBG("Writing key/value; key length=%zu, value length=%zu",
key.size(),
value.size());
- TRY(CheckOperation(key));
-
if (Entry::size(partition_, key, value) > partition_.sector_size_bytes()) {
DBG("%zu B value with %zu B key cannot fit in one sector",
value.size(),
@@ -346,7 +381,7 @@
}
Status KeyValueStore::Delete(string_view key) {
- TRY(CheckOperation(key));
+ TRY(CheckWriteOperation(key));
EntryMetadata metadata;
TRY(entry_cache_.FindExisting(partition_, key, &metadata));
@@ -391,7 +426,7 @@
}
StatusWithSize KeyValueStore::ValueSize(string_view key) const {
- TRY_WITH_SIZE(CheckOperation(key));
+ TRY_WITH_SIZE(CheckReadOperation(key));
EntryMetadata metadata;
TRY_WITH_SIZE(entry_cache_.FindExisting(partition_, key, &metadata));
@@ -426,7 +461,7 @@
Status KeyValueStore::FixedSizeGet(std::string_view key,
void* value,
size_t size_bytes) const {
- TRY(CheckOperation(key));
+ TRY(CheckWriteOperation(key));
EntryMetadata metadata;
TRY(entry_cache_.FindExisting(partition_, key, &metadata));
@@ -463,16 +498,31 @@
return StatusWithSize(entry.value_size());
}
-Status KeyValueStore::CheckOperation(string_view key) const {
+Status KeyValueStore::CheckWriteOperation(string_view key) const {
if (InvalidKey(key)) {
return Status::INVALID_ARGUMENT;
}
+
+ // For normal write operation the KVS must be fully ready.
if (!initialized()) {
return Status::FAILED_PRECONDITION;
}
return Status::OK;
}
+Status KeyValueStore::CheckReadOperation(string_view key) const {
+ if (InvalidKey(key)) {
+ return Status::INVALID_ARGUMENT;
+ }
+
+ // Operations that are explicitly read-only can be done after init() has been
+ // called but not fully ready (when needing maintenance).
+ if (initialized_ == InitializationState::kNotInitialized) {
+ return Status::FAILED_PRECONDITION;
+ }
+ return Status::OK;
+}
+
Status KeyValueStore::WriteEntryForExistingKey(EntryMetadata& metadata,
EntryState new_state,
string_view key,
@@ -575,7 +625,7 @@
do_auto_gc = false;
}
// Garbage collect and then try again to find the best sector.
- Status gc_status = GarbageCollectPartial(reserved);
+ Status gc_status = GarbageCollect(reserved);
if (!gc_status.ok()) {
if (gc_status == Status::NOT_FOUND) {
// Not enough space, and no reclaimable bytes, this KVS is full!
@@ -603,6 +653,16 @@
return result;
}
+Status KeyValueStore::MarkSectorCorruptIfNotOk(Status status,
+ SectorDescriptor* sector) {
+ if (!status.ok()) {
+ DBG(" Sector %u corrupt", sectors_.Index(sector));
+ sector->mark_corrupt();
+ error_detected_ = true;
+ }
+ return status;
+}
+
Status KeyValueStore::AppendEntry(const Entry& entry,
string_view key,
span<const byte> value) {
@@ -618,11 +678,11 @@
entry.size(),
size_t(entry.address()),
result.size());
- return result.status();
+ TRY(MarkSectorCorruptIfNotOk(result.status(), §or));
}
if (options_.verify_on_write) {
- TRY(entry.VerifyChecksumInFlash());
+ TRY(MarkSectorCorruptIfNotOk(entry.VerifyChecksumInFlash(), §or));
}
sector.AddValidBytes(result.size());
@@ -648,20 +708,32 @@
const Address new_address = sectors_.NextWritableAddress(*new_sector);
const StatusWithSize result = entry.Copy(new_address);
- new_sector->RemoveWritableBytes(result.size());
- TRY(result);
+ TRY(MarkSectorCorruptIfNotOk(result.status(), new_sector));
+
+ if (options_.verify_on_write) {
+ TRY(MarkSectorCorruptIfNotOk(entry.VerifyChecksumInFlash(), new_sector));
+ }
// Entry was written successfully; update descriptor's address and the sector
// descriptors to reflect the new entry.
- sectors_.FromAddress(address).RemoveValidBytes(result.size());
+ new_sector->RemoveWritableBytes(result.size());
new_sector->AddValidBytes(result.size());
+ sectors_.FromAddress(address).RemoveValidBytes(result.size());
address = new_address;
return Status::OK;
}
-Status KeyValueStore::GarbageCollectFull() {
- DBG("Garbage Collect all sectors");
+Status KeyValueStore::FullMaintenance() {
+ if (initialized_ == InitializationState::kNotInitialized) {
+ return Status::FAILED_PRECONDITION;
+ }
+
+ DBG("Do full maintenance");
+
+ if (error_detected_) {
+ TRY(Repair());
+ }
SectorDescriptor* sector = sectors_.last_new();
@@ -678,12 +750,20 @@
}
}
- DBG("Garbage Collect all complete");
+ DBG("Full maintenance complete");
return Status::OK;
}
-Status KeyValueStore::GarbageCollectPartial(
- span<const Address> reserved_addresses) {
+Status KeyValueStore::GarbageCollect(span<const Address> reserved_addresses) {
+ if (initialized_ == InitializationState::kNotInitialized) {
+ return Status::FAILED_PRECONDITION;
+ }
+
+ // Do automatic repair, if KVS options allow for it.
+ if (error_detected_ && options_.recovery != ErrorRecovery::kManual) {
+ TRY(Repair());
+ }
+
DBG("Garbage Collect a single sector");
for (Address address : reserved_addresses) {
DBG(" Avoid address %u", unsigned(address));
@@ -720,6 +800,7 @@
Status KeyValueStore::GarbageCollectSector(
SectorDescriptor& sector_to_gc, span<const Address> reserved_addresses) {
+ DBG(" Garbage Collect sector %u", sectors_.Index(sector_to_gc));
// Step 1: Move any valid entries in the GC sector to other sectors
if (sector_to_gc.valid_bytes() != 0) {
for (const EntryMetadata& metadata : entry_cache_) {
@@ -736,7 +817,7 @@
}
// Step 2: Reinitialize the sector
- sector_to_gc.set_writable_bytes(0);
+ sector_to_gc.mark_corrupt();
TRY(partition_.Erase(sectors_.BaseAddress(sector_to_gc), 1));
sector_to_gc.set_writable_bytes(partition_.sector_size_bytes());
@@ -744,6 +825,171 @@
return Status::OK;
}
+// Add any missing redundant entries/copies for a key.
+Status KeyValueStore::AddRedundantEntries(EntryMetadata& metadata) {
+ SectorDescriptor* new_sector;
+
+ Entry entry;
+
+ // For simplicity use just the first copy. Any known bad copies should have
+ // been removed already.
+ // TODO: Add support to read other copies if needed.
+ TRY(Entry::Read(partition_, metadata.first_address(), formats_, &entry));
+ TRY(entry.VerifyChecksumInFlash());
+
+ for (size_t i = metadata.addresses().size();
+ metadata.addresses().size() < redundancy();
+ i++) {
+ TRY(sectors_.FindSpace(&new_sector, entry.size(), metadata.addresses()));
+
+ const Address new_address = sectors_.NextWritableAddress(*new_sector);
+ const StatusWithSize result = entry.Copy(new_address);
+ TRY(MarkSectorCorruptIfNotOk(result.status(), new_sector));
+
+ if (options_.verify_on_write) {
+ TRY(MarkSectorCorruptIfNotOk(entry.VerifyChecksumInFlash(), new_sector));
+ }
+ // Entry was written successfully; update descriptor's address and the
+ // sector descriptors to reflect the new entry.
+ new_sector->RemoveWritableBytes(result.size());
+ new_sector->AddValidBytes(result.size());
+
+ metadata.AddNewAddress(new_address);
+ }
+ return Status::OK;
+}
+
+Status KeyValueStore::RepairCorruptSectors() {
+ // Try to GC each corrupt sector, even if previous sectors fail. If GC of a
+ // sector failed on the first pass, then do a second pass, since a later
+ // sector might have cleared up space or otherwise unblocked the earlier
+ // failed sector.
+ Status repair_status = Status::OK;
+
+ size_t loop_count = 0;
+ do {
+ loop_count++;
+ // Error of RESOURCE_EXHAUSTED indicates no space found for relocation.
+ // Reset back to OK for the next pass.
+ if (repair_status == Status::RESOURCE_EXHAUSTED) {
+ repair_status = Status::OK;
+ }
+
+ DBG(" Pass %u", unsigned(loop_count));
+ for (SectorDescriptor& sector : sectors_) {
+ if (sector.corrupt()) {
+ DBG(" Found sector %u with corruption", sectors_.Index(sector));
+ Status sector_status = GarbageCollectSector(sector, {});
+ if (sector_status.ok()) {
+ error_stats_.corrupt_sectors_recovered += 1;
+ } else if (repair_status.ok() ||
+ repair_status == Status::RESOURCE_EXHAUSTED) {
+ repair_status = sector_status;
+ }
+ }
+ }
+ DBG(" Pass %u complete", unsigned(loop_count));
+ } while (!repair_status.ok() && loop_count < 2);
+
+ return repair_status;
+}
+
+Status KeyValueStore::EnsureFreeSectorExists() {
+ Status repair_status = Status::OK;
+ bool empty_sector_found = false;
+
+ DBG(" Find empty sector");
+ for (SectorDescriptor& sector : sectors_) {
+ if (sector.Empty(partition_.sector_size_bytes())) {
+ empty_sector_found = true;
+ DBG(" Empty sector found");
+ break;
+ }
+ }
+ if (empty_sector_found == false) {
+ DBG(" No empty sector found, attempting to GC a free sector");
+ Status sector_status = GarbageCollect(span<const Address, 0>());
+ if (repair_status.ok() && !sector_status.ok()) {
+ DBG(" Unable to free an empty sector");
+ repair_status = sector_status;
+ }
+ }
+
+ return repair_status;
+}
+
+Status KeyValueStore::EnsureEntryRedundancy() {
+ Status repair_status = Status::OK;
+
+ if (redundancy() == 1) {
+ DBG(" Redundancy not in use, nothing to check");
+ return Status::OK;
+ }
+
+ DBG(" Write any needed additional duplicate copies of key to fulfill %u "
+ "redundancy",
+ unsigned(redundancy()));
+ for (const EntryMetadata& metadata : entry_cache_) {
+ if (metadata.addresses().size() >= redundancy()) {
+ continue;
+ }
+
+ DBG(" Key with %u of %u copies found, adding missing copies",
+ unsigned(metadata.addresses().size()),
+ unsigned(redundancy()));
+ // TODO: Add non-const iterator to the entry_cache_ and remove this
+ // const_cast.
+ Status fill_status =
+ AddRedundantEntries(const_cast<EntryMetadata&>(metadata));
+ if (fill_status.ok()) {
+ error_stats_.missing_redundant_entries_recovered += 1;
+ DBG(" Key missing copies added");
+ } else {
+ DBG(" Failed to add key missing copies");
+ if (repair_status.ok()) {
+ repair_status = fill_status;
+ }
+ }
+ }
+
+ return repair_status;
+}
+
+Status KeyValueStore::Repair() {
+ // Collect and return the first error encountered.
+ Status overall_status = Status::OK;
+
+ DBG("KVS repair");
+
+ // Step 1: Garbage collect any sectors marked as corrupt.
+ Status repair_status = RepairCorruptSectors();
+ if (overall_status.ok()) {
+ overall_status = repair_status;
+ }
+
+ // Step 2: Make sure there is at least 1 empty sector. This needs to be a
+ // seperate check of sectors from step 1, because a found empty sector might
+ // get written to by a later GC that fails and does not result in a free
+ // sector.
+ repair_status = EnsureFreeSectorExists();
+ if (overall_status.ok()) {
+ overall_status = repair_status;
+ }
+
+ // Step 3: Make sure each stored key has the full number of redundant
+ // entries.
+ repair_status = EnsureEntryRedundancy();
+ if (overall_status.ok()) {
+ overall_status = repair_status;
+ }
+
+ if (overall_status.ok()) {
+ error_detected_ = false;
+ initialized_ = InitializationState::kReady;
+ }
+ return overall_status;
+}
+
KeyValueStore::Entry KeyValueStore::CreateEntry(Address address,
string_view key,
span<const byte> value,
@@ -862,7 +1108,7 @@
void KeyValueStore::LogKeyDescriptor() const {
DBG("Key descriptors: count %zu", entry_cache_.total_entries());
- for (auto& metadata : entry_cache_) {
+ for (const EntryMetadata& metadata : entry_cache_) {
DBG(" - Key: %s, hash %#zx, transaction ID %zu, first address %#zx",
metadata.state() == EntryState::kDeleted ? "Deleted" : "Valid",
static_cast<size_t>(metadata.hash()),
diff --git a/pw_kvs/key_value_store_binary_format_test.cc b/pw_kvs/key_value_store_binary_format_test.cc
index 6ae2e5d..8042712 100644
--- a/pw_kvs/key_value_store_binary_format_test.cc
+++ b/pw_kvs/key_value_store_binary_format_test.cc
@@ -99,8 +99,17 @@
}
constexpr uint32_t kMagic = 0xc001beef;
+
constexpr Options kNoGcOptions{
.gc_on_write = GargbageCollectOnWrite::kDisabled,
+ .recovery = ErrorRecovery::kManual,
+ .verify_on_read = true,
+ .verify_on_write = true,
+};
+
+constexpr Options kRecoveryNoGcOptions{
+ .gc_on_write = GargbageCollectOnWrite::kDisabled,
+ .recovery = ErrorRecovery::kLazy,
.verify_on_read = true,
.verify_on_write = true,
};
@@ -211,8 +220,8 @@
flash_.buffer()[1025] = byte(0xef);
ASSERT_EQ(Status::DATA_LOSS, kvs_.Init());
- EXPECT_EQ(Status::RESOURCE_EXHAUSTED, kvs_.Put("hello", ByteStr("world")));
- EXPECT_EQ(Status::RESOURCE_EXHAUSTED, kvs_.Put("a", ByteStr("b")));
+ EXPECT_EQ(Status::FAILED_PRECONDITION, kvs_.Put("hello", ByteStr("world")));
+ EXPECT_EQ(Status::FAILED_PRECONDITION, kvs_.Put("a", ByteStr("b")));
// Existing valid entries should still be readable.
EXPECT_EQ(1u, kvs_.size());
@@ -230,8 +239,8 @@
InitFlashTo(AsBytes(kEntry1, kEntry2));
// Corrupt all of the 4 512-byte flash sectors. Leave the pre-init entries
- // intact. A corrupt sector without entries should be GC'ed on init because
- // the KVS must maintain one empty sector at all times.
+ // intact. The KVS should be unavailable because recovery is set to full
+ // manual, and it does not have the required one empty sector at all times.
flash_.buffer()[64] = byte(0xef);
flash_.buffer()[513] = byte(0xef);
flash_.buffer()[1025] = byte(0xef);
@@ -241,7 +250,7 @@
auto stats = kvs_.GetStorageStats();
EXPECT_EQ(64u, stats.in_use_bytes);
- EXPECT_EQ(3 * 512u - 64u, stats.reclaimable_bytes);
+ EXPECT_EQ(4 * 512u - 64u, stats.reclaimable_bytes);
EXPECT_EQ(0u, stats.writable_bytes);
}
@@ -281,8 +290,8 @@
auto stats = kvs_.GetStorageStats();
EXPECT_EQ(stats.in_use_bytes, 0u);
- EXPECT_EQ(stats.reclaimable_bytes, 32u);
- EXPECT_EQ(stats.writable_bytes, 512u * 3 - 32);
+ EXPECT_EQ(stats.reclaimable_bytes, 512u);
+ EXPECT_EQ(stats.writable_bytes, 512u * 2);
// The bytes were marked used, so a new key should not overlap with the bytes
// from the failed Put.
@@ -290,8 +299,201 @@
stats = kvs_.GetStorageStats();
EXPECT_EQ(stats.in_use_bytes, (32u * kvs_.redundancy()));
- EXPECT_EQ(stats.reclaimable_bytes, 32u);
- EXPECT_EQ(stats.writable_bytes, 512u * 3 - (32 + (32 * kvs_.redundancy())));
+ EXPECT_EQ(stats.reclaimable_bytes, 512u);
+ EXPECT_EQ(stats.writable_bytes, 512u * 2 - (32 * kvs_.redundancy()));
+}
+
+class KvsErrorRecovery : public ::testing::Test {
+ protected:
+ KvsErrorRecovery()
+ : flash_(internal::Entry::kMinAlignmentBytes),
+ partition_(&flash_),
+ kvs_(&partition_,
+ {.magic = kMagic, .checksum = &checksum},
+ kRecoveryNoGcOptions) {}
+
+ void InitFlashTo(span<const byte> contents) {
+ partition_.Erase();
+ std::memcpy(flash_.buffer().data(), contents.data(), contents.size());
+ }
+
+ FakeFlashBuffer<512, 4> flash_;
+ FlashPartition partition_;
+ KeyValueStoreBuffer<kMaxEntries, kMaxUsableSectors> kvs_;
+};
+
+TEST_F(KvsErrorRecovery, Init_Ok) {
+ InitFlashTo(AsBytes(kEntry1, kEntry2));
+
+ EXPECT_EQ(Status::OK, kvs_.Init());
+ byte buffer[64];
+ EXPECT_EQ(Status::OK, kvs_.Get("key1", buffer).status());
+ EXPECT_EQ(Status::OK, kvs_.Get("k2", buffer).status());
+}
+
+TEST_F(KvsErrorRecovery, Init_DuplicateEntries_RecoversDuringInit) {
+ InitFlashTo(AsBytes(kEntry1, kEntry1));
+
+ EXPECT_EQ(Status::OK, kvs_.Init());
+ auto stats = kvs_.GetStorageStats();
+ EXPECT_EQ(stats.corrupt_sectors_recovered, 1u);
+
+ byte buffer[64];
+ EXPECT_EQ(Status::OK, kvs_.Get("key1", buffer).status());
+ EXPECT_EQ(Status::NOT_FOUND, kvs_.Get("k2", buffer).status());
+}
+
+TEST_F(KvsErrorRecovery, Init_CorruptEntry_FindsSubsequentValidEntry) {
+ // Corrupt each byte in the first entry once.
+ for (size_t i = 0; i < kEntry1.size(); ++i) {
+ InitFlashTo(AsBytes(kEntry1, kEntry2));
+ flash_.buffer()[i] = byte(int(flash_.buffer()[i]) + 1);
+
+ ASSERT_EQ(Status::OK, kvs_.Init());
+ byte buffer[64];
+ ASSERT_EQ(Status::NOT_FOUND, kvs_.Get("key1", buffer).status());
+ ASSERT_EQ(Status::OK, kvs_.Get("k2", buffer).status());
+
+ auto stats = kvs_.GetStorageStats();
+ // One valid entry.
+ ASSERT_EQ(32u, stats.in_use_bytes);
+ // The sector with corruption should have been recovered.
+ ASSERT_EQ(0u, stats.reclaimable_bytes);
+ ASSERT_EQ(1u, stats.corrupt_sectors_recovered);
+ }
+}
+
+TEST_F(KvsErrorRecovery, Init_CorruptEntry_CorrectlyAccountsForSectorSize) {
+ InitFlashTo(AsBytes(kEntry1, kEntry2, kEntry3, kEntry4));
+
+ // Corrupt the first and third entries.
+ flash_.buffer()[9] = byte(0xef);
+ flash_.buffer()[67] = byte(0xef);
+
+ ASSERT_EQ(Status::OK, kvs_.Init());
+
+ EXPECT_EQ(2u, kvs_.size());
+
+ byte buffer[64];
+ EXPECT_EQ(Status::NOT_FOUND, kvs_.Get("key1", buffer).status());
+ EXPECT_EQ(Status::OK, kvs_.Get("k2", buffer).status());
+ EXPECT_EQ(Status::NOT_FOUND, kvs_.Get("k3y", buffer).status());
+ EXPECT_EQ(Status::OK, kvs_.Get("4k", buffer).status());
+
+ auto stats = kvs_.GetStorageStats();
+ ASSERT_EQ(64u, stats.in_use_bytes);
+ ASSERT_EQ(0u, stats.reclaimable_bytes);
+ ASSERT_EQ(1472u, stats.writable_bytes);
+ ASSERT_EQ(1u, stats.corrupt_sectors_recovered);
+}
+
+TEST_F(KvsErrorRecovery, Init_ReadError_IsNotInitialized) {
+ InitFlashTo(AsBytes(kEntry1, kEntry2));
+
+ flash_.InjectReadError(
+ FlashError::InRange(Status::UNAUTHENTICATED, kEntry1.size()));
+
+ EXPECT_EQ(Status::UNKNOWN, kvs_.Init());
+ EXPECT_FALSE(kvs_.initialized());
+}
+
+TEST_F(KvsErrorRecovery, Init_CorruptSectors_ShouldBeUnwritable) {
+ InitFlashTo(AsBytes(kEntry1, kEntry2));
+
+ // Corrupt 3 of the 4 512-byte flash sectors. Corrupt sectors should be
+ // recovered via garbage collection.
+ flash_.buffer()[1] = byte(0xef);
+ flash_.buffer()[513] = byte(0xef);
+ flash_.buffer()[1025] = byte(0xef);
+
+ ASSERT_EQ(Status::OK, kvs_.Init());
+ EXPECT_EQ(Status::OK, kvs_.Put("hello", ByteStr("world")));
+ EXPECT_EQ(Status::OK, kvs_.Put("a", ByteStr("b")));
+
+ // Existing valid entries should still be readable.
+ EXPECT_EQ(3u, kvs_.size());
+ byte buffer[64];
+ EXPECT_EQ(Status::NOT_FOUND, kvs_.Get("key1", buffer).status());
+ EXPECT_EQ(Status::OK, kvs_.Get("k2", buffer).status());
+
+ auto stats = kvs_.GetStorageStats();
+ EXPECT_EQ(96u, stats.in_use_bytes);
+ EXPECT_EQ(0u, stats.reclaimable_bytes);
+ EXPECT_EQ(1440u, stats.writable_bytes);
+ ASSERT_EQ(3u, stats.corrupt_sectors_recovered);
+}
+
+TEST_F(KvsErrorRecovery, Init_CorruptSectors_ShouldRecoverOne) {
+ InitFlashTo(AsBytes(kEntry1, kEntry2));
+
+ // Corrupt all of the 4 512-byte flash sectors. Leave the pre-init entries
+ // intact. As part of recovery all corrupt sectors should get garbage
+ // collected.
+ flash_.buffer()[64] = byte(0xef);
+ flash_.buffer()[513] = byte(0xef);
+ flash_.buffer()[1025] = byte(0xef);
+ flash_.buffer()[1537] = byte(0xef);
+
+ ASSERT_EQ(Status::OK, kvs_.Init());
+
+ auto stats = kvs_.GetStorageStats();
+ EXPECT_EQ(64u, stats.in_use_bytes);
+ EXPECT_EQ(0u, stats.reclaimable_bytes);
+ EXPECT_EQ(3 * 512u - 64u, stats.writable_bytes);
+}
+
+TEST_F(KvsErrorRecovery, Init_CorruptKey_RevertsToPreviousVersion) {
+ constexpr auto kVersion7 =
+ MakeValidEntry(kMagic, 7, "my_key", ByteStr("version 7"));
+ constexpr auto kVersion8 =
+ MakeValidEntry(kMagic, 8, "my_key", ByteStr("version 8"));
+
+ InitFlashTo(AsBytes(kVersion7, kVersion8));
+
+ // Corrupt a byte of entry version 8 (addresses 32-63).
+ flash_.buffer()[34] = byte(0xef);
+
+ ASSERT_EQ(Status::OK, kvs_.Init());
+
+ char buffer[64] = {};
+
+ EXPECT_EQ(1u, kvs_.size());
+
+ auto result = kvs_.Get("my_key", as_writable_bytes(span(buffer)));
+ EXPECT_EQ(Status::OK, result.status());
+ EXPECT_EQ(sizeof("version 7") - 1, result.size());
+ EXPECT_STREQ("version 7", buffer);
+
+ EXPECT_EQ(32u, kvs_.GetStorageStats().in_use_bytes);
+}
+
+TEST_F(KvsErrorRecovery, Put_WriteFailure_EntryNotAddedButBytesMarkedWritten) {
+ ASSERT_EQ(Status::OK, kvs_.Init());
+ flash_.InjectWriteError(FlashError::Unconditional(Status::UNAVAILABLE, 1));
+
+ EXPECT_EQ(Status::UNAVAILABLE, kvs_.Put("key1", ByteStr("value1")));
+ EXPECT_EQ(true, kvs_.error_detected());
+
+ EXPECT_EQ(Status::NOT_FOUND, kvs_.Get("key1", span<byte>()).status());
+ ASSERT_TRUE(kvs_.empty());
+
+ auto stats = kvs_.GetStorageStats();
+ EXPECT_EQ(stats.in_use_bytes, 0u);
+ EXPECT_EQ(stats.reclaimable_bytes, 512u);
+ EXPECT_EQ(stats.writable_bytes, 512u * 2);
+ EXPECT_EQ(stats.corrupt_sectors_recovered, 0u);
+ EXPECT_EQ(stats.missing_redundant_entries_recovered, 0u);
+
+ // The bytes were marked used, so a new key should not overlap with the bytes
+ // from the failed Put.
+ EXPECT_EQ(Status::OK, kvs_.Put("key1", ByteStr("value1")));
+
+ stats = kvs_.GetStorageStats();
+ EXPECT_EQ(stats.in_use_bytes, (32u * kvs_.redundancy()));
+ EXPECT_EQ(stats.reclaimable_bytes, 512u);
+ EXPECT_EQ(stats.writable_bytes, 512u * 2 - (32 * kvs_.redundancy()));
+ EXPECT_EQ(stats.corrupt_sectors_recovered, 0u);
+ EXPECT_EQ(stats.missing_redundant_entries_recovered, 0u);
}
constexpr uint32_t kAltMagic = 0xbadD00D;
@@ -328,7 +530,7 @@
{.magic = kAltMagic, .checksum = &alt_checksum},
{.magic = kNoChecksumMagic, .checksum = nullptr},
}},
- kNoGcOptions) {
+ kRecoveryNoGcOptions) {
partition_.Erase();
std::memcpy(flash_.buffer().data(),
kInitialContents.data(),
diff --git a/pw_kvs/key_value_store_map_test.cc b/pw_kvs/key_value_store_map_test.cc
index 1f46d14..e2d8f68 100644
--- a/pw_kvs/key_value_store_map_test.cc
+++ b/pw_kvs/key_value_store_map_test.cc
@@ -319,7 +319,7 @@
void GCFull() {
StartOperation("GCFull");
- Status status = kvs_.GarbageCollectFull();
+ Status status = kvs_.FullMaintenance();
EXPECT_EQ(Status::OK, status);
KeyValueStore::StorageStats post_stats = kvs_.GetStorageStats();
EXPECT_EQ(post_stats.reclaimable_bytes, 0U);
@@ -329,7 +329,7 @@
void GCPartial() {
StartOperation("GCPartial");
KeyValueStore::StorageStats pre_stats = kvs_.GetStorageStats();
- Status status = kvs_.GarbageCollectPartial();
+ Status status = kvs_.PartialMaintenance();
KeyValueStore::StorageStats post_stats = kvs_.GetStorageStats();
if (pre_stats.reclaimable_bytes != 0) {
EXPECT_EQ(Status::OK, status);
diff --git a/pw_kvs/public/pw_kvs/internal/entry_cache.h b/pw_kvs/public/pw_kvs/internal/entry_cache.h
index 48688a6..fde68fe 100644
--- a/pw_kvs/public/pw_kvs/internal/entry_cache.h
+++ b/pw_kvs/public/pw_kvs/internal/entry_cache.h
@@ -58,6 +58,9 @@
addresses_ = span(addresses_.begin(), addresses_.size() + 1);
}
+ // Remove an address from the entry metadata.
+ void RemoveAddress(Address address_to_remove);
+
// Resets the KeyDescrtiptor and addresses to refer to the provided
// KeyDescriptor and address.
void Reset(const KeyDescriptor& descriptor, Address address);
diff --git a/pw_kvs/public/pw_kvs/key_value_store.h b/pw_kvs/public/pw_kvs/key_value_store.h
index 69193e4..62c037d 100644
--- a/pw_kvs/public/pw_kvs/key_value_store.h
+++ b/pw_kvs/public/pw_kvs/key_value_store.h
@@ -49,10 +49,15 @@
// Immediately do full recovery of any errors that are detected.
kImmediate,
- // Recover from errors, but do some time consuming steps at a later time. Such
- // as waiting to garbage collect sectors with corrupt entries until the next
- // garbage collection.
+ // Recover from errors, but delay time consuming recovery steps until later
+ // as part of other time consuming operations. Such as waiting to garbage
+ // collect sectors with corrupt entries until the next garbage collection.
kLazy,
+
+ // Only recover from errors when manually triggered as part of maintenance
+ // operations. This is not recommended for normal use, only for test or
+ // read-only use.
+ kManual,
};
struct Options {
@@ -90,7 +95,9 @@
//
Status Init();
- bool initialized() const { return initialized_; }
+ bool initialized() const {
+ return initialized_ == InitializationState::kReady;
+ }
// Reads the value of an entry in the KVS. The value is read into the provided
// buffer and the number of bytes read is returned. If desired, the read can
@@ -171,14 +178,17 @@
//
StatusWithSize ValueSize(std::string_view key) const;
- // Perform garbage collection of all reclaimable space in the KVS.
- Status GarbageCollectFull();
+ // Perform all maintenance possible, including all neeeded repairing of
+ // corruption and garbage collection of all reclaimable space in the KVS. When
+ // configured for manual recovery, this is the only way KVS repair is
+ // triggered.
+ Status FullMaintenance();
- // Perform garbage collection of part of the KVS, typically a single sector or
- // similar unit that makes sense for the KVS implementation.
- Status GarbageCollectPartial() {
- return GarbageCollectPartial(span<const Address>());
- }
+ // Perform a portion of KVS maintenance. If configured for at least lazy
+ // recovery, will do any needed repairing of corruption. Does garbage
+ // collection of part of the KVS, typically a single sector or similar unit
+ // that makes sense for the KVS implementation.
+ Status PartialMaintenance() { return GarbageCollect(span<const Address>()); }
void LogDebugInfo() const;
@@ -280,6 +290,8 @@
size_t writable_bytes;
size_t in_use_bytes;
size_t reclaimable_bytes;
+ size_t corrupt_sectors_recovered;
+ size_t missing_redundant_entries_recovered;
};
StorageStats GetStorageStats() const;
@@ -287,6 +299,9 @@
// Level of redundancy to use for writing entries.
size_t redundancy() const { return entry_cache_.redundancy(); }
+ bool error_detected() const { return error_detected_; }
+ bool CheckForErrors();
+
protected:
using Address = FlashPartition::Address;
using Entry = internal::Entry;
@@ -340,7 +355,8 @@
void* value,
size_t size_bytes) const;
- Status CheckOperation(std::string_view key) const;
+ Status CheckWriteOperation(std::string_view key) const;
+ Status CheckReadOperation(std::string_view key) const;
Status WriteEntryForExistingKey(EntryMetadata& metadata,
EntryState new_state,
@@ -364,6 +380,8 @@
size_t entry_size,
span<const Address> addresses_to_skip);
+ Status MarkSectorCorruptIfNotOk(Status status, SectorDescriptor* sector);
+
Status AppendEntry(const Entry& entry,
std::string_view key,
span<const std::byte> value);
@@ -372,7 +390,9 @@
KeyValueStore::Address& address,
span<const Address> addresses_to_skip);
- Status GarbageCollectPartial(span<const Address> addresses_to_skip);
+ // Find and garbage collect a singe sector that does not include an address to
+ // skip.
+ Status GarbageCollect(span<const Address> addresses_to_skip);
Status RelocateKeyAddressesInSector(SectorDescriptor& sector_to_gc,
const EntryMetadata& descriptor,
@@ -381,7 +401,15 @@
Status GarbageCollectSector(SectorDescriptor& sector_to_gc,
span<const Address> addresses_to_skip);
- Status Repair() { return Status::UNIMPLEMENTED; }
+ Status AddRedundantEntries(EntryMetadata& metadata);
+
+ Status RepairCorruptSectors();
+
+ Status EnsureFreeSectorExists();
+
+ Status EnsureEntryRedundancy();
+
+ Status Repair();
internal::Entry CreateEntry(Address address,
std::string_view key,
@@ -403,10 +431,27 @@
Options options_;
- bool initialized_;
+ enum class InitializationState {
+ // KVS Init() has not been called and KVS is not usable.
+ kNotInitialized,
+
+ // KVS Init() run but not all the needed invariants are met for an operating
+ // KVS. KVS is not writable, but read operaions are possible.
+ kNeedsMaintenance,
+
+ // KVS is fully ready for use.
+ kReady,
+ };
+ InitializationState initialized_;
bool error_detected_;
+ struct ErrorStats {
+ size_t corrupt_sectors_recovered;
+ size_t missing_redundant_entries_recovered;
+ };
+ ErrorStats error_stats_;
+
uint32_t last_transaction_id_;
};