pw_kvs: Add support for multiple redundant copies of entries
Add support to KVS for storing multiple identical copies of key-value
entries.
Currently only tested for kEntryRedundancy = 1.
Change-Id: Ibb8e5667f1f8406070a8d8854c1145daa76c5aea
diff --git a/pw_kvs/key_value_store.cc b/pw_kvs/key_value_store.cc
index eb1631f..7f2dc2d 100644
--- a/pw_kvs/key_value_store.cc
+++ b/pw_kvs/key_value_store.cc
@@ -108,7 +108,7 @@
if (status == Status::DATA_LOSS) {
// The entry could not be read, indicating data corruption within the
// sector. Try to scan the remainder of the sector for other entries.
- ERR("KVS init: data loss detected in sector %u at address %zu",
+ WRN("KVS init: data loss detected in sector %u at address %zu",
SectorIndex(§or),
size_t(entry_address));
@@ -171,7 +171,7 @@
for (KeyDescriptor& key_descriptor : key_descriptors_) {
for (auto& address : key_descriptor.addresses()) {
Entry entry;
- TRY(Entry::Read(partition_, key_descriptor.address(), formats_, &entry));
+ TRY(Entry::Read(partition_, address, formats_, &entry));
SectorFromAddress(address)->AddValidBytes(entry.size());
}
if (key_descriptor.IsNewerThan(last_transaction_id_)) {
@@ -376,7 +376,7 @@
if (status.ok()) {
// TODO: figure out logging how to support multiple addresses.
- DBG("Overwriting entry for key %#08" PRIx32 " in %u sectors including %u",
+ DBG("Overwriting entry for key %#010" PRIx32 " in %u sectors including %u",
key_descriptor->hash(),
unsigned(key_descriptor->addresses().size()),
SectorIndex(SectorFromAddress(key_descriptor->address())));
@@ -398,7 +398,7 @@
TRY(FindExistingKeyDescriptor(key, &key_descriptor));
// TODO: figure out logging how to support multiple addresses.
- DBG("Writing tombstone for key %#08" PRIx32 " in %u sectors including %u",
+ DBG("Writing tombstone for key %#010" PRIx32 " in %u sectors including %u",
key_descriptor->hash(),
unsigned(key_descriptor->addresses().size()),
SectorIndex(SectorFromAddress(key_descriptor->address())));
@@ -462,6 +462,8 @@
span<std::byte> value_buffer,
size_t offset_bytes) const {
Entry entry;
+ // TODO: add support for using one of the redundant entries if reading the
+ // first copy fails.
TRY_WITH_SIZE(
Entry::Read(partition_, descriptor.address(), formats_, &entry));
@@ -582,23 +584,23 @@
KeyDescriptor::State new_state,
string_view key,
span<const byte> value) {
- // Find the original entry and sector to update the sector's valid_bytes.
Entry original_entry;
+ // TODO: add support for using one of the redundant entries if reading the
+ // first copy fails.
TRY(Entry::Read(
partition_, key_descriptor->address(), formats_, &original_entry));
- SectorDescriptor* sector;
- TRY(FindOrRecoverSectorWithSpace(§or,
- Entry::size(partition_, key, value)));
- DBG("Writing existing entry; found sector %u (%#" PRIx32 ")",
- SectorIndex(sector),
- SectorBaseAddress(sector));
+ // Create a new temporary key descriptor to use while writing the new
+ // key-value out to flash. Once the writing is done, update the main
+ // descriptor for this key with the new information.
+ KeyDescriptor new_key_descriptor(key);
+ TRY(WriteEntry(&new_key_descriptor, key, value, new_state));
- // TODO: Verify the copy does a full copy including the address vector.
+ // Update the main descriptor for the new key version.
KeyDescriptor old_key_descriptor = *key_descriptor;
+ *key_descriptor = new_key_descriptor;
- TRY(AppendEntry(sector, key_descriptor, key, value, new_state));
-
+ // Remove all the valid bytes for the old key version, which are now stale.
for (auto& address : old_key_descriptor.addresses()) {
SectorFromAddress(address)->RemoveValidBytes(original_entry.size());
}
@@ -614,23 +616,151 @@
return Status::RESOURCE_EXHAUSTED;
}
- SectorDescriptor* sector;
- TRY(FindOrRecoverSectorWithSpace(§or,
- Entry::size(partition_, key, value)));
- DBG("Writing new entry; found sector: %u", SectorIndex(sector));
-
- // Create the KeyDescriptor that will be added to the list. The transaction ID
- // and address will be set by AppendEntry.
+ // Create the KeyDescriptor that will be added to the list and write it to
+ // flash.
KeyDescriptor key_descriptor(key);
- TRY(AppendEntry(sector, &key_descriptor, key, value, KeyDescriptor::kValid));
+ TRY(WriteEntry(&key_descriptor, key, value, KeyDescriptor::kValid));
// Only add the entry when we are certain the write succeeded.
key_descriptors_.push_back(key_descriptor);
return Status::OK;
}
+Status KeyValueStore::WriteEntry(KeyDescriptor* key_descriptor,
+ string_view key,
+ span<const byte> value,
+ KeyDescriptor::State new_state) {
+ size_t entry_size = Entry::size(partition_, key, value);
+
+ Entry entry = CreateEntry(0, key, value, new_state);
+ *key_descriptor = entry.descriptor(key);
+ key_descriptor->addresses().clear();
+
+ // For number of redundany entries to be written, do the following:
+ // - Find a sector to write an individual entry to. This optionally will
+ // include garbage collecting one or more sectors if needed.
+ // - Write the entry to the sector.
+ // - Repeat for redundancy number of total entries.
+ for (size_t i = 0; i < internal::kEntryRedundancy; i++) {
+ SectorDescriptor* sector;
+ TRY(GetSectorForWrite(§or, entry_size, key_descriptor));
+
+ DBG("Writing entry %zu; found sector: %u", i, SectorIndex(sector));
+ const Address write_address = NextWritableAddress(sector);
+ TRY(AppendEntry(write_address, entry, key, value));
+
+ // Entry copy was written successfully; update the key descriptor to reflect
+ // the new entry.
+ key_descriptor->addresses().push_back(write_address);
+ }
+
+ // Once all the entries are written, add valid bytes to each of the sectors
+ // that entries were written to.
+ for (auto new_address : key_descriptor->addresses()) {
+ SectorFromAddress(new_address)->AddValidBytes(entry.size());
+ }
+
+ return Status::OK;
+}
+
+// Find a sector to use for writing a new entry to. Do automatic garbage
+// collection if needed and allowed.
+//
+// OK: Sector found with needed space.
+// RESOURCE_EXHAUSTED: No sector available with the needed space.
+Status KeyValueStore::GetSectorForWrite(SectorDescriptor** sector,
+ size_t entry_size,
+ KeyDescriptor* key_descriptor) {
+ Status result = FindSectorWithSpace(
+ sector, entry_size, kAppendEntry, key_descriptor->addresses());
+
+ bool do_auto_gc = options_.gc_on_write != GargbageCollectOnWrite::kDisabled;
+
+ // Do garbage collection as needed, so long as policy allows.
+ while (result == Status::RESOURCE_EXHAUSTED && do_auto_gc) {
+ if (options_.gc_on_write == GargbageCollectOnWrite::kOneSector) {
+ // If GC config option is kOneSector clear the flag to not do any more
+ // GC after this try.
+ do_auto_gc = false;
+ }
+ // Garbage collect and then try again to find the best sector.
+ Status gc_status = GarbageCollectPartial();
+ if (!gc_status.ok()) {
+ if (gc_status == Status::NOT_FOUND) {
+ // Not enough space, and no reclaimable bytes, this KVS is full!
+ return Status::RESOURCE_EXHAUSTED;
+ }
+ return gc_status;
+ }
+
+ result = FindSectorWithSpace(
+ sector, entry_size, kAppendEntry, key_descriptor->addresses());
+ }
+
+ if (!result.ok()) {
+ WRN("Unable to find sector to write %zu B", entry_size);
+ }
+ return result;
+}
+
+Status KeyValueStore::AppendEntry(Address write_address,
+ Entry& entry,
+ string_view key,
+ span<const byte> value) {
+ entry.UpdateAddress(write_address);
+
+ StatusWithSize result = entry.Write(key, value);
+ // Remove any bytes that were written, even if the write was not successful.
+ // This is important to retain the writable space invariant on the sectors.
+ SectorFromAddress(write_address)->RemoveWritableBytes(result.size());
+
+ if (!result.ok()) {
+ ERR("Failed to write %zu bytes at %#zx. %zu actually written",
+ entry.size(),
+ size_t(write_address),
+ result.size());
+ return result.status();
+ }
+
+ if (options_.verify_on_write) {
+ TRY(entry.VerifyChecksumInFlash());
+ }
+
+ return Status::OK;
+}
+
Status KeyValueStore::RelocateEntry(KeyDescriptor& key_descriptor,
- KeyValueStore::Address address) {
+ KeyValueStore::Address old_address) {
+ Entry entry;
+ TRY(Entry::Read(partition_, old_address, formats_, &entry));
+
+ // Find a new sector for the entry and write it to the new location. For
+ // relocation the find should not not be a sector already containing the key
+ // but can be the always empty sector, since this is part of the GC process
+ // that will result in a new empty sector. Also find a sector that does not
+ // have reclaimable space (mostly for the full GC, where that would result in
+ // an immediate extra relocation).
+ SectorDescriptor* new_sector;
+
+ TRY(FindSectorWithSpace(
+ &new_sector, entry.size(), kGarbageCollect, key_descriptor.addresses()));
+ const Address new_address = NextWritableAddress(new_sector);
+ TRY(MoveEntry(new_address, entry));
+
+ // TODO: Perhaps add check that the entry matches the key descriptor (key
+ // hash, ID, checksum).
+
+ // Entry was written successfully; update the key descriptor and the sector
+ // descriptors to reflect the new entry.
+ key_descriptor.UpdateAddress(old_address, new_address);
+ new_sector->AddValidBytes(entry.size());
+ SectorFromAddress(old_address)->RemoveValidBytes(entry.size());
+
+ return Status::OK;
+}
+
+Status KeyValueStore::MoveEntry(Address new_address, Entry& entry) {
+ // Step 1: Read the old entry.
struct TempEntry {
Entry::KeyBuffer key;
std::array<byte, sizeof(working_buffer_) - sizeof(key)> value;
@@ -638,16 +768,9 @@
auto [key_buffer, value_buffer] =
*std::launder(reinterpret_cast<TempEntry*>(working_buffer_.data()));
- DBG("Relocating entry at %zx for key %#010" PRIx32,
- size_t(address),
- key_descriptor.hash());
-
// Read the entry to be relocated. Store the entry in a local variable and
// store the key and value in the TempEntry stored in the static allocated
// working_buffer_.
- Entry entry;
- TRY(Entry::Read(partition_, key_descriptor.address(), formats_, &entry));
-
TRY_ASSIGN(size_t key_length, entry.ReadKey(key_buffer));
string_view key = string_view(key_buffer.data(), key_length);
@@ -659,43 +782,30 @@
const span value = span(value_buffer.data(), result.size());
TRY(entry.VerifyChecksum(key, value));
- // Find a new sector for the entry and write it to the new location. For
- // relocation the find should not not be a sector already containing the key
- // but can be the always empty sector, since this is part of the GC process
- // that will result in a new empty sector. Also find a sector that does not
- // have reclaimable space (mostly for the full GC, where that would result in
- // an immediate extra relocation).
- SectorDescriptor* new_sector;
+ DBG("Moving %zu B entry with transaction ID %zu to address %#zx",
+ entry.size(),
+ size_t(entry.transaction_id()),
+ size_t(new_address));
- // Build a vector of sectors to avoid.
- Vector<SectorDescriptor*, internal::kEntryRedundancy> old_sectors;
- for (auto& address : key_descriptor.addresses()) {
- old_sectors.push_back(SectorFromAddress(address));
+ // Step 2: Write the entry to the new location.
+ entry.UpdateAddress(new_address);
+ result = entry.Write(key, value);
+
+ // Remove any bytes that were written, even if the write was not successful.
+ // This is important to retain the writable space invariant on the sectors.
+ SectorFromAddress(new_address)->RemoveWritableBytes(result.size());
+
+ if (!result.ok()) {
+ ERR("Failed to write %zu bytes at %" PRIx32 ". %zu actually written",
+ entry.size(),
+ new_address,
+ result.size());
+ return result.status();
}
- // TODO: Remove this once const span can take a non-const span.
- auto old_sectors_const =
- span(const_cast<const SectorDescriptor**>(old_sectors.data()),
- old_sectors.size());
-
- TRY(FindSectorWithSpace(
- &new_sector, entry.size(), kGarbageCollect, old_sectors_const));
-
- // TODO: This does an entry with new transaction ID. This needs to get changed
- // to be a copy of this entry with the same transaction ID.
- TRY(AppendEntry(
- new_sector, &key_descriptor, key, value, key_descriptor.state()));
-
- // Do the valid bytes accounting for the sector the entry was relocated from.
- // TODO: AppendEntry() creates an entry with new transaction ID. While that is
- // used all the old sectors need the valid bytes to be removed. Once it is
- // switched over to do a copy of the current entry with the same transaction
- // ID, then the valid bytes need to be removed from only the one sector being
- // relocated out of.
- // SectorFromAddress(address)->RemoveValidBytes(entry.size());
- (void)address;
- for (auto& old_sector : old_sectors) {
- old_sector->RemoveValidBytes(entry.size());
+ // Step 3: Verify write to the new location.
+ if (options_.verify_on_write) {
+ TRY(entry.VerifyChecksumInFlash());
}
return Status::OK;
@@ -709,10 +819,16 @@
SectorDescriptor** found_sector,
size_t size,
FindSectorMode find_mode,
- span<const SectorDescriptor*> sectors_to_skip) {
+ span<const Address> addresses_to_skip) {
SectorDescriptor* first_empty_sector = nullptr;
bool at_least_two_empty_sectors = (find_mode == kGarbageCollect);
+ // Build a vector of sectors to avoid.
+ Vector<const SectorDescriptor*, internal::kEntryRedundancy> sectors_to_skip;
+ for (auto& address : addresses_to_skip) {
+ sectors_to_skip.push_back(SectorFromAddress(address));
+ }
+
DBG("Find sector with %zu bytes available, starting with sector %u",
size,
SectorIndex(last_new_sector_));
@@ -784,18 +900,6 @@
return Status::RESOURCE_EXHAUSTED;
}
-Status KeyValueStore::FindOrRecoverSectorWithSpace(SectorDescriptor** sector,
- size_t size) {
- Status result = FindSectorWithSpace(sector, size, kAppendEntry);
- if (result == Status::RESOURCE_EXHAUSTED &&
- options_.gc_on_write != GargbageCollectOnWrite::kDisabled) {
- // Garbage collect and then try again to find the best sector.
- TRY(GarbageCollectPartial());
- return FindSectorWithSpace(sector, size, kAppendEntry);
- }
- return result;
-}
-
KeyValueStore::SectorDescriptor* KeyValueStore::FindSectorToGarbageCollect() {
const size_t sector_size_bytes = partition_.sector_size_bytes();
SectorDescriptor* sector_candidate = nullptr;
@@ -861,8 +965,8 @@
SectorDescriptor* sector_to_gc = FindSectorToGarbageCollect();
if (sector_to_gc == nullptr) {
- // Nothing to GC, all done.
- return Status::OK;
+ // Nothing to GC.
+ return Status::NOT_FOUND;
}
TRY(GarbageCollectSector(sector_to_gc));
@@ -896,43 +1000,6 @@
return Status::OK;
}
-Status KeyValueStore::AppendEntry(SectorDescriptor* sector,
- KeyDescriptor* key_descriptor,
- string_view key,
- span<const byte> value,
- KeyDescriptor::State new_state) {
- const Address address = NextWritableAddress(sector);
- Entry entry = CreateEntry(address, key, value, new_state);
-
- DBG("Appending %zu B entry with transaction ID %" PRIu32 " to address %#zx",
- entry.size(),
- entry.transaction_id(),
- size_t(address));
-
- StatusWithSize result = entry.Write(key, value);
- // Remove any bytes that were written, even if the write was not successful.
- // This is important to retain the writable space invariant on the sectors.
- sector->RemoveWritableBytes(result.size());
-
- if (!result.ok()) {
- ERR("Failed to write %zu bytes at %" PRIx32 ". %zu actually written",
- entry.size(),
- address,
- result.size());
- return result.status();
- }
-
- if (options_.verify_on_write) {
- TRY(entry.VerifyChecksumInFlash());
- }
-
- // Entry was written successfully; update the key descriptor and the sector
- // descriptor to reflect the new entry.
- entry.UpdateDescriptor(key_descriptor);
- sector->AddValidBytes(result.size());
- return Status::OK;
-}
-
KeyValueStore::Entry KeyValueStore::CreateEntry(Address address,
std::string_view key,
span<const byte> value,
diff --git a/pw_kvs/key_value_store_map_test.cc b/pw_kvs/key_value_store_map_test.cc
index a6b2555..9a33b76 100644
--- a/pw_kvs/key_value_store_map_test.cc
+++ b/pw_kvs/key_value_store_map_test.cc
@@ -311,11 +311,12 @@
StartOperation("GCPartial", "");
KeyValueStore::StorageStats pre_stats = kvs_.GetStorageStats();
Status status = kvs_.GarbageCollectPartial();
- EXPECT_EQ(Status::OK, status);
KeyValueStore::StorageStats post_stats = kvs_.GetStorageStats();
if (pre_stats.reclaimable_bytes != 0) {
+ EXPECT_EQ(Status::OK, status);
EXPECT_LT(post_stats.reclaimable_bytes, pre_stats.reclaimable_bytes);
} else {
+ EXPECT_EQ(Status::NOT_FOUND, status);
EXPECT_EQ(post_stats.reclaimable_bytes, 0U);
}
FinishOperation("GCPartial", status, "");
diff --git a/pw_kvs/public/pw_kvs/internal/entry.h b/pw_kvs/public/pw_kvs/internal/entry.h
index 773e4c3..0560f28 100644
--- a/pw_kvs/public/pw_kvs/internal/entry.h
+++ b/pw_kvs/public/pw_kvs/internal/entry.h
@@ -94,14 +94,10 @@
deleted() ? KeyDescriptor::kDeleted : KeyDescriptor::kValid);
}
- void UpdateDescriptor(KeyDescriptor* kd) {
- kd->transaction_id_ = transaction_id();
- kd->addresses_.assign(1, address_);
- kd->state_ = deleted() ? KeyDescriptor::kDeleted : KeyDescriptor::kValid;
- }
-
StatusWithSize Write(std::string_view key, span<const std::byte> value) const;
+ void UpdateAddress(Address address) { address_ = address; }
+
// Reads a key into a buffer, which must be large enough for a max-length key.
// If successful, the size is returned in the StatusWithSize. The key is not
// null terminated.
diff --git a/pw_kvs/public/pw_kvs/internal/key_descriptor.h b/pw_kvs/public/pw_kvs/internal/key_descriptor.h
index 9db386e..9397c4e 100644
--- a/pw_kvs/public/pw_kvs/internal/key_descriptor.h
+++ b/pw_kvs/public/pw_kvs/internal/key_descriptor.h
@@ -46,6 +46,19 @@
// TODO: remove address() once all the use of it is gone.
uint32_t address() const { return addresses_[0]; }
+ Status UpdateAddress(FlashPartition::Address old_address,
+ FlashPartition::Address new_address) {
+ for (auto& address : addresses()) {
+ if (address == old_address) {
+ address = new_address;
+ return Status::OK;
+ }
+ }
+
+ // Unable to find the address to update.
+ return Status::INVALID_ARGUMENT;
+ }
+
Vector<FlashPartition::Address, kEntryRedundancy>& addresses() {
return addresses_;
}
diff --git a/pw_kvs/public/pw_kvs/key_value_store.h b/pw_kvs/public/pw_kvs/key_value_store.h
index 8b6cecf..bb4b108 100644
--- a/pw_kvs/public/pw_kvs/key_value_store.h
+++ b/pw_kvs/public/pw_kvs/key_value_store.h
@@ -310,8 +310,19 @@
"as_bytes(span(&value, 1)) or as_writable_bytes(span(&value, 1)).");
}
+ Status LoadEntry(Address entry_address, Address* next_entry_address);
+ Status ScanForEntry(const SectorDescriptor& sector,
+ Address start_address,
+ Address* next_entry_address);
+ Status AppendNewOrOverwriteStaleExistingDescriptor(
+ const KeyDescriptor& key_descriptor);
+
+ KeyDescriptor* FindDescriptor(uint32_t hash);
+
Status PutBytes(std::string_view key, span<const std::byte> value);
+ StatusWithSize ValueSize(const KeyDescriptor& descriptor) const;
+
StatusWithSize Get(std::string_view key,
const KeyDescriptor& descriptor,
span<std::byte> value_buffer,
@@ -326,8 +337,6 @@
void* value,
size_t size_bytes) const;
- StatusWithSize ValueSize(const KeyDescriptor& descriptor) const;
-
Status CheckOperation(std::string_view key) const;
Status FindKeyDescriptor(std::string_view key,
@@ -348,14 +357,6 @@
key, const_cast<const KeyDescriptor**>(result));
}
- Status LoadEntry(Address entry_address, Address* next_entry_address);
- Status ScanForEntry(const SectorDescriptor& sector,
- Address start_address,
- Address* next_entry_address);
- Status AppendNewOrOverwriteStaleExistingDescriptor(
- const KeyDescriptor& key_descriptor);
- Status AppendEmptyDescriptor(KeyDescriptor** new_descriptor);
-
Status WriteEntryForExistingKey(KeyDescriptor* key_descriptor,
KeyDescriptor::State new_state,
std::string_view key,
@@ -363,30 +364,35 @@
Status WriteEntryForNewKey(std::string_view key, span<const std::byte> value);
+ Status WriteEntry(KeyDescriptor* key_descriptor,
+ std::string_view key,
+ span<const std::byte> value,
+ KeyDescriptor::State new_state);
+
+ Status GetSectorForWrite(SectorDescriptor** sector,
+ size_t entry_size,
+ KeyDescriptor* key_descriptor);
+
+ Status AppendEntry(Address write_address,
+ Entry& entry,
+ std::string_view key,
+ span<const std::byte> value);
+
Status RelocateEntry(KeyDescriptor& key_descriptor,
KeyValueStore::Address address);
+ Status MoveEntry(Address new_address, Entry& entry);
+
enum FindSectorMode { kAppendEntry, kGarbageCollect };
Status FindSectorWithSpace(SectorDescriptor** found_sector,
size_t size,
FindSectorMode find_mode,
- span<const SectorDescriptor*> sector_to_skip =
- span<const SectorDescriptor*>());
-
- Status FindOrRecoverSectorWithSpace(SectorDescriptor** sector, size_t size);
-
- Status GarbageCollectSector(SectorDescriptor* sector_to_gc);
+ span<const Address> addresses_to_skip);
SectorDescriptor* FindSectorToGarbageCollect();
- KeyDescriptor* FindDescriptor(uint32_t hash);
-
- Status AppendEntry(SectorDescriptor* sector,
- KeyDescriptor* key_descriptor,
- std::string_view key,
- span<const std::byte> value,
- KeyDescriptor::State new_state);
+ Status GarbageCollectSector(SectorDescriptor* sector_to_gc);
bool AddressInSector(const SectorDescriptor& sector, Address address) const {
const Address sector_base = SectorBaseAddress(§or);