pw_kvs: Remove temp buffer; redundancy changes
- Instead of copying entries to a large working buffer, incrementally
read and checksum them and then incrementally copy them. Remove the
working buffer.
- Find space for all redundant entries before writing any of them. This
ensures that redundancy is maintained when adding entries to a
mostly-full KVS.
- Update KeyDescriptors immediately after writing the first copy of
an entry, instead of after attempting to write all copies.
- Eliminate temporary KeyDescriptor objects. Remove the public
KeyDescriptor constructors, including the default.
Change-Id: Ia3674e260c9ab0fdc01965563343b2cf5da37adf
diff --git a/pw_kvs/entry.cc b/pw_kvs/entry.cc
index fdc20ce..5795342 100644
--- a/pw_kvs/entry.cc
+++ b/pw_kvs/entry.cc
@@ -112,7 +112,7 @@
}
StatusWithSize Entry::Copy(Address new_address) const {
- PW_LOG_DEBUG("Coying entry from 0x%x to 0x%x as ID %" PRIu32,
+ PW_LOG_DEBUG("Copying entry from 0x%x to 0x%x as ID %" PRIu32,
unsigned(address()),
unsigned(new_address),
transaction_id());
diff --git a/pw_kvs/key_value_store.cc b/pw_kvs/key_value_store.cc
index 93c077d..d76323e 100644
--- a/pw_kvs/key_value_store.cc
+++ b/pw_kvs/key_value_store.cc
@@ -36,6 +36,14 @@
return key.empty() || (key.size() > internal::Entry::kMaxKeyLength);
}
+// Returns true if the container conatins the value.
+// TODO: At some point move this to pw_containers, along with adding tests.
+template <typename Container, typename T>
+bool Contains(const Container& container, const T& value) {
+ return std::find(std::begin(container), std::end(container), value) !=
+ std::end(container);
+}
+
} // namespace
KeyValueStore::KeyValueStore(FlashPartition* partition,
@@ -72,14 +80,6 @@
const size_t sector_size_bytes = partition_.sector_size_bytes();
- if (working_buffer_.size() < sector_size_bytes) {
- ERR("KVS init failed: working_buffer_ (%zu B) is smaller than sector size "
- "(%zu B)",
- working_buffer_.size(),
- sector_size_bytes);
- return Status::INVALID_ARGUMENT;
- }
-
DBG("First pass: Read all entries from all sectors");
Address sector_address = 0;
@@ -457,7 +457,7 @@
return valid_entries;
}
-StatusWithSize KeyValueStore::ValueSize(std::string_view key) const {
+StatusWithSize KeyValueStore::ValueSize(string_view key) const {
TRY_WITH_SIZE(CheckOperation(key));
const KeyDescriptor* key_descriptor;
@@ -597,28 +597,13 @@
KeyDescriptor::State new_state,
string_view key,
span<const byte> value) {
- Entry original_entry;
+ // Read the original entry to get the size for sector accounting purposes.
+ Entry entry;
// TODO: add support for using one of the redundant entries if reading the
// first copy fails.
- TRY(Entry::Read(
- partition_, key_descriptor->address(), formats_, &original_entry));
+ TRY(Entry::Read(partition_, key_descriptor->address(), formats_, &entry));
- // Create a new temporary key descriptor to use while writing the new
- // key-value out to flash. Once the writing is done, update the main
- // descriptor for this key with the new information.
- KeyDescriptor new_key_descriptor(key);
- TRY(WriteEntry(&new_key_descriptor, key, value, new_state));
-
- // Update the main descriptor for the new key version.
- KeyDescriptor old_key_descriptor = *key_descriptor;
- *key_descriptor = new_key_descriptor;
-
- // Remove all the valid bytes for the old key version, which are now stale.
- for (auto& address : old_key_descriptor.addresses()) {
- SectorFromAddress(address)->RemoveValidBytes(original_entry.size());
- }
-
- return Status::OK;
+ return WriteEntry(key, value, new_state, key_descriptor, entry.size());
}
Status KeyValueStore::WriteEntryForNewKey(string_view key,
@@ -629,51 +614,66 @@
return Status::RESOURCE_EXHAUSTED;
}
- // Create the KeyDescriptor that will be added to the list and write it to
- // flash.
- KeyDescriptor key_descriptor(key);
- TRY(WriteEntry(&key_descriptor, key, value, KeyDescriptor::kValid));
+ return WriteEntry(key, value, KeyDescriptor::kValid);
+}
- // Only add the entry when we are certain the write succeeded.
- key_descriptors_.push_back(key_descriptor);
+Status KeyValueStore::WriteEntry(string_view key,
+ span<const byte> value,
+ KeyDescriptor::State new_state,
+ KeyDescriptor* prior_descriptor,
+ size_t prior_size) {
+ const size_t entry_size = Entry::size(partition_, key, value);
+
+ // List of addresses for sectors with space for this entry.
+ // TODO: The derived class should allocate space for this address list.
+ Vector<Address, internal::kEntryRedundancy> addresses;
+
+ // Find sectors to write the entry to. This may involve garbage collecting one
+ // or more sectors.
+ for (size_t i = 0; i < redundancy_; i++) {
+ SectorDescriptor* sector;
+ TRY(GetSectorForWrite(§or, entry_size, addresses));
+
+ DBG("Found space for entry in sector %u", SectorIndex(sector));
+ addresses.push_back(NextWritableAddress(sector));
+ }
+
+ // Write the entry at the first address that was found.
+ Entry entry = CreateEntry(addresses.front(), key, value, new_state);
+ TRY(AppendEntry(entry, key, value));
+
+ // After writing the first entry successfully, update the key descriptors.
+ // Once a single new the entry is written, the old entries are invalidated.
+ KeyDescriptor& new_descriptor =
+ UpdateKeyDescriptor(entry, key, prior_descriptor, prior_size);
+
+ // Write the additional copies of the entry, if redundancy is greater than 1.
+ for (size_t i = 1; i < addresses.size(); ++i) {
+ entry.set_address(addresses[i]);
+ TRY(AppendEntry(entry, key, value));
+ new_descriptor.addresses().push_back(addresses[i]);
+ }
return Status::OK;
}
-Status KeyValueStore::WriteEntry(KeyDescriptor* key_descriptor,
- string_view key,
- span<const byte> value,
- KeyDescriptor::State new_state) {
- size_t entry_size = Entry::size(partition_, key, value);
-
- Entry entry = CreateEntry(0, key, value, new_state);
- *key_descriptor = entry.descriptor(key);
- key_descriptor->addresses().clear();
-
- // For number of redundany entries to be written, do the following:
- // - Find a sector to write an individual entry to. This optionally will
- // include garbage collecting one or more sectors if needed.
- // - Write the entry to the sector.
- // - Repeat for redundancy number of total entries.
- for (size_t i = 0; i < redundancy_; i++) {
- SectorDescriptor* sector;
- TRY(GetSectorForWrite(§or, entry_size, key_descriptor->addresses()));
-
- DBG("Writing entry %zu; found sector: %u", i, SectorIndex(sector));
- const Address write_address = NextWritableAddress(sector);
- TRY(AppendEntry(write_address, entry, key, value));
-
- // Entry copy was written successfully; update the key descriptor to reflect
- // the new entry.
- key_descriptor->addresses().push_back(write_address);
+internal::KeyDescriptor& KeyValueStore::UpdateKeyDescriptor(
+ const Entry& entry,
+ string_view key,
+ KeyDescriptor* prior_descriptor,
+ size_t prior_size) {
+ // If there is no prior descriptor, create a new one.
+ if (prior_descriptor == nullptr) {
+ key_descriptors_.push_back(entry.descriptor(key));
+ return key_descriptors_.back();
}
- // Once all the entries are written, add valid bytes to each of the sectors
- // that entries were written to.
- for (auto new_address : key_descriptor->addresses()) {
- SectorFromAddress(new_address)->AddValidBytes(entry.size());
+ // Remove valid bytes for the old entry and its copies, which are now stale.
+ for (Address address : prior_descriptor->addresses()) {
+ SectorFromAddress(address)->RemoveValidBytes(prior_size);
}
- return Status::OK;
+ *prior_descriptor = entry.descriptor(prior_descriptor->hash());
+ return *prior_descriptor;
}
// Find a sector to use for writing a new entry to. Do automatic garbage
@@ -727,21 +727,20 @@
return result;
}
-Status KeyValueStore::AppendEntry(Address write_address,
- Entry& entry,
+Status KeyValueStore::AppendEntry(const Entry& entry,
string_view key,
span<const byte> value) {
- entry.set_address(write_address);
+ const StatusWithSize result = entry.Write(key, value);
- StatusWithSize result = entry.Write(key, value);
// Remove any bytes that were written, even if the write was not successful.
// This is important to retain the writable space invariant on the sectors.
- SectorFromAddress(write_address)->RemoveWritableBytes(result.size());
+ SectorDescriptor* const sector = SectorFromAddress(entry.address());
+ sector->RemoveWritableBytes(result.size());
if (!result.ok()) {
ERR("Failed to write %zu bytes at %#zx. %zu actually written",
entry.size(),
- size_t(write_address),
+ size_t(entry.address()),
result.size());
return result.status();
}
@@ -750,13 +749,15 @@
TRY(entry.VerifyChecksumInFlash());
}
+ sector->AddValidBytes(result.size());
return Status::OK;
}
-Status KeyValueStore::RelocateEntry(KeyDescriptor* key_descriptor,
- KeyValueStore::Address old_address) {
+Status KeyValueStore::RelocateEntry(KeyDescriptor& key_descriptor,
+ KeyValueStore::Address& address,
+ span<const Address> addresses_to_skip) {
Entry entry;
- TRY(Entry::Read(partition_, old_address, formats_, &entry));
+ TRY(Entry::Read(partition_, address, formats_, &entry));
// Find a new sector for the entry and write it to the new location. For
// relocation the find should not not be a sector already containing the key
@@ -766,92 +767,42 @@
// an immediate extra relocation).
SectorDescriptor* new_sector;
- TRY(FindSectorWithSpace(
- &new_sector, entry.size(), kGarbageCollect, key_descriptor->addresses()));
- const Address new_address = NextWritableAddress(new_sector);
- TRY(MoveEntry(new_address, entry));
+ // Avoid both this entry's sectors and sectors in addresses_to_skip.
+ //
+ // This is overly strict. addresses_to_skip is populated when there are
+ // sectors reserved for a new entry. It is safe to garbage collect into these
+ // sectors, as long as there remains room for the pending entry. These
+ // reserved sectors could also be garbage collected if they have recoverable
+ // space.
+ // TODO(hepler): Look into improving garbage collection.
+ //
+ // TODO: The derived class should allocate space for this address list.
+ Vector<Address, internal::kEntryRedundancy* 2> all_addresses_to_skip =
+ key_descriptor.addresses();
+ for (Address address : addresses_to_skip) {
+ if (!Contains(all_addresses_to_skip, address)) {
+ // TODO: DCHECK(!all_addresses_to_skip.full)
+ all_addresses_to_skip.push_back(address);
+ }
+ }
- // TODO: Perhaps add check that the entry matches the key descriptor (key
- // hash, ID, checksum).
+ TRY(FindSectorWithSpace(
+ &new_sector, entry.size(), kGarbageCollect, all_addresses_to_skip));
+
+ const Address new_address = NextWritableAddress(new_sector);
+ const StatusWithSize result = entry.Copy(new_address);
+ new_sector->RemoveWritableBytes(result.size());
+ TRY(result);
// Entry was written successfully; update the key descriptor and the sector
// descriptors to reflect the new entry.
- TRY(key_descriptor->UpdateAddress(old_address, new_address));
-
- if ((key_descriptor >= key_descriptors_.begin()) &&
- (key_descriptor < key_descriptors_.end())) {
- // If the key_descriptor is in the main key_descriptors_ list, it has been
- // accounted for in valid bytes, so only do valid byte updates for those
- // descriptors.
- new_sector->AddValidBytes(entry.size());
- SectorFromAddress(old_address)->RemoveValidBytes(entry.size());
- }
+ SectorFromAddress(address)->RemoveValidBytes(result.size());
+ new_sector->AddValidBytes(result.size());
+ address = new_address;
return Status::OK;
}
-Status KeyValueStore::MoveEntry(Address new_address, Entry& entry) {
- // Step 1: Read the old entry.
- struct TempEntry {
- Entry::KeyBuffer key;
- std::array<byte, sizeof(working_buffer_) - sizeof(key)> value;
- };
- auto [key_buffer, value_buffer] =
- *std::launder(reinterpret_cast<TempEntry*>(working_buffer_.data()));
-
- // Read the entry to be relocated. Store the entry in a local variable and
- // store the key and value in the TempEntry stored in the static allocated
- // working_buffer_.
- TRY_ASSIGN(size_t key_length, entry.ReadKey(key_buffer));
- string_view key = string_view(key_buffer.data(), key_length);
-
- StatusWithSize result = entry.ReadValue(value_buffer);
- if (!result.ok()) {
- return Status::INTERNAL;
- }
-
- const span value = span(value_buffer.data(), result.size());
- TRY(entry.VerifyChecksum(key, value));
-
- DBG("Moving %zu B entry with transaction ID %zu to to sector %u address %#zx",
- entry.size(),
- size_t(entry.transaction_id()),
- SectorIndex(SectorFromAddress(new_address)),
- size_t(new_address));
-
- // Step 2: Write the entry to the new location.
- entry.set_address(new_address);
- result = entry.Write(key, value);
-
- // Remove any bytes that were written, even if the write was not successful.
- // This is important to retain the writable space invariant on the sectors.
- SectorFromAddress(new_address)->RemoveWritableBytes(result.size());
-
- if (!result.ok()) {
- ERR("Failed to write %zu bytes at %" PRIx32 ". %zu actually written",
- entry.size(),
- new_address,
- result.size());
- return result.status();
- }
-
- // Step 3: Verify write to the new location.
- if (options_.verify_on_write) {
- TRY(entry.VerifyChecksumInFlash());
- }
-
- return Status::OK;
-}
-
-// Find if search_set contains value.
-// TODO: At some point move this to pw_containers, along with adding tests for
-// it.
-template <typename Container, typename T>
-bool Contains(const Container& search_set, const T& value) {
- return std::find(std::begin(search_set), std::end(search_set), value) !=
- std::end(search_set);
-}
-
// Find either an existing sector with enough space that is not the sector to
// skip, or an empty sector. Maintains the invariant that there is always at
// least 1 empty sector except during GC. On GC, skip sectors that have
@@ -869,8 +820,9 @@
const size_t sector_size_bytes = partition_.sector_size_bytes();
// Build a vector of sectors to avoid.
- Vector<SectorDescriptor*, internal::kEntryRedundancy> sectors_to_skip;
- for (auto& address : addresses_to_skip) {
+ // TODO(hepler): Consolidate the different address / sector lists.
+ Vector<SectorDescriptor*, internal::kEntryRedundancy * 2> sectors_to_skip;
+ for (Address address : addresses_to_skip) {
sectors_to_skip.push_back(SectorFromAddress(address));
}
@@ -979,6 +931,7 @@
size_t candidate_bytes = 0;
// Build a vector of sectors to avoid.
+ // TODO(hepler): Consolidate the three address / sector lists.
Vector<const SectorDescriptor*, internal::kEntryRedundancy> sectors_to_skip;
for (auto& address : addresses_to_avoid) {
sectors_to_skip.push_back(SectorFromAddress(address));
@@ -1047,7 +1000,7 @@
}
if (sector->RecoverableBytes(partition_.sector_size_bytes()) > 0) {
- TRY(GarbageCollectSector(sector));
+ TRY(GarbageCollectSector(sector, {}));
}
}
@@ -1072,45 +1025,32 @@
}
// Step 2: Garbage collect the selected sector.
- TRY(GarbageCollectSector(sector_to_gc, nullptr));
- return Status::OK;
+ return GarbageCollectSector(sector_to_gc, addresses_to_skip);
}
Status KeyValueStore::RelocateKeyAddressesInSector(
- internal::SectorDescriptor* sector_to_gc,
- internal::KeyDescriptor* descriptor) {
- for (auto address : descriptor->addresses()) {
- if (AddressInSector(*sector_to_gc, address)) {
- DBG(" Relocate entry for Key 0x%08zx, address %zu",
- size_t(descriptor->hash()),
- size_t(address));
- TRY(RelocateEntry(descriptor, address));
+ internal::SectorDescriptor& sector_to_gc,
+ internal::KeyDescriptor& descriptor,
+ span<const Address> addresses_to_skip) {
+ for (FlashPartition::Address& address : descriptor.addresses()) {
+ if (AddressInSector(sector_to_gc, address)) {
+ DBG(" Relocate entry for Key 0x%08zx, sector %u",
+ size_t(descriptor.hash()),
+ SectorIndex(SectorFromAddress(address)));
+ TRY(RelocateEntry(descriptor, address, addresses_to_skip));
}
}
+
return Status::OK;
};
-Status KeyValueStore::GarbageCollectSector(SectorDescriptor* sector_to_gc,
- KeyDescriptor* key_in_progress) {
- // Pre-step: Check if the key in progress has any addresses in the sector to
- // GC.
- bool key_in_progress_in_sector = false;
- if (key_in_progress != nullptr) {
- for (Address address : key_in_progress->addresses()) {
- if (AddressInSector(*sector_to_gc, address)) {
- key_in_progress_in_sector = true;
- break;
- }
- }
- }
-
+Status KeyValueStore::GarbageCollectSector(
+ SectorDescriptor* sector_to_gc, span<const Address> addresses_to_skip) {
// Step 1: Move any valid entries in the GC sector to other sectors
- if ((sector_to_gc->valid_bytes() != 0 || key_in_progress_in_sector)) {
- if (key_in_progress != nullptr) {
- TRY(RelocateKeyAddressesInSector(sector_to_gc, key_in_progress));
- }
- for (auto& descriptor : key_descriptors_) {
- TRY(RelocateKeyAddressesInSector(sector_to_gc, &descriptor));
+ if (sector_to_gc->valid_bytes() != 0) {
+ for (KeyDescriptor& descriptor : key_descriptors_) {
+ TRY(RelocateKeyAddressesInSector(
+ *sector_to_gc, descriptor, addresses_to_skip));
}
}
@@ -1131,7 +1071,7 @@
}
KeyValueStore::Entry KeyValueStore::CreateEntry(Address address,
- std::string_view key,
+ string_view key,
span<const byte> value,
KeyDescriptor::State state) {
// Always bump the transaction ID when creating a new entry.
diff --git a/pw_kvs/public/pw_kvs/internal/entry.h b/pw_kvs/public/pw_kvs/internal/entry.h
index f5a74ae..c2f9cf7 100644
--- a/pw_kvs/public/pw_kvs/internal/entry.h
+++ b/pw_kvs/public/pw_kvs/internal/entry.h
@@ -90,7 +90,15 @@
return KeyDescriptor(
key,
transaction_id(),
- address_,
+ address(),
+ deleted() ? KeyDescriptor::kDeleted : KeyDescriptor::kValid);
+ }
+
+ KeyDescriptor descriptor(uint32_t key_hash) const {
+ return KeyDescriptor(
+ key_hash,
+ transaction_id(),
+ address(),
deleted() ? KeyDescriptor::kDeleted : KeyDescriptor::kValid);
}
diff --git a/pw_kvs/public/pw_kvs/internal/key_descriptor.h b/pw_kvs/public/pw_kvs/internal/key_descriptor.h
index aa4b580..aa915f9 100644
--- a/pw_kvs/public/pw_kvs/internal/key_descriptor.h
+++ b/pw_kvs/public/pw_kvs/internal/key_descriptor.h
@@ -31,15 +31,6 @@
public:
enum State { kValid, kDeleted };
- KeyDescriptor(std::string_view key) : KeyDescriptor(key, 0, 0, kValid) {}
- KeyDescriptor(const KeyDescriptor& kd)
- : key_hash_(kd.key_hash_),
- transaction_id_(kd.transaction_id_),
- addresses_(kd.addresses_.begin(), kd.addresses_.end()),
- state_(kd.state_) {}
-
- KeyDescriptor& operator=(const KeyDescriptor& other) = default;
-
uint32_t hash() const { return key_hash_; }
uint32_t transaction_id() const { return transaction_id_; }
@@ -79,13 +70,19 @@
private:
friend class Entry;
+ KeyDescriptor(uint32_t key_hash,
+ uint32_t version,
+ FlashPartition::Address address,
+ State initial_state)
+ : key_hash_(key_hash), transaction_id_(version), state_(initial_state) {
+ addresses_.assign(1, address);
+ }
+
KeyDescriptor(std::string_view key,
uint32_t version,
FlashPartition::Address address,
State initial_state)
- : key_hash_(Hash(key)), transaction_id_(version), state_(initial_state) {
- addresses_.assign(1, address);
- }
+ : KeyDescriptor(Hash(key), version, address, initial_state) {}
uint32_t key_hash_;
uint32_t transaction_id_;
diff --git a/pw_kvs/public/pw_kvs/key_value_store.h b/pw_kvs/public/pw_kvs/key_value_store.h
index 53614ed..9888e7f 100644
--- a/pw_kvs/public/pw_kvs/key_value_store.h
+++ b/pw_kvs/public/pw_kvs/key_value_store.h
@@ -369,22 +369,28 @@
Status WriteEntryForNewKey(std::string_view key, span<const std::byte> value);
- Status WriteEntry(KeyDescriptor* key_descriptor,
- std::string_view key,
+ Status WriteEntry(std::string_view key,
span<const std::byte> value,
- KeyDescriptor::State new_state);
+ KeyDescriptor::State new_state,
+ KeyDescriptor* prior_descriptor = nullptr,
+ size_t prior_size = 0);
+
+ KeyDescriptor& UpdateKeyDescriptor(const Entry& new_entry,
+ std::string_view key,
+ KeyDescriptor* prior_descriptor,
+ size_t prior_size);
Status GetSectorForWrite(SectorDescriptor** sector,
size_t entry_size,
span<const Address> addresses_to_skip);
- Status AppendEntry(Address write_address,
- Entry& entry,
+ Status AppendEntry(const Entry& entry,
std::string_view key,
span<const std::byte> value);
- Status RelocateEntry(KeyDescriptor* key_descriptor,
- KeyValueStore::Address address);
+ Status RelocateEntry(KeyDescriptor& key_descriptor,
+ KeyValueStore::Address& address,
+ span<const Address> addresses_to_skip);
Status MoveEntry(Address new_address, Entry& entry);
@@ -400,11 +406,12 @@
Status GarbageCollectPartial(span<const Address> addresses_to_skip);
- Status RelocateKeyAddressesInSector(internal::SectorDescriptor* sector_to_gc,
- internal::KeyDescriptor* descriptor);
+ Status RelocateKeyAddressesInSector(internal::SectorDescriptor& sector_to_gc,
+ internal::KeyDescriptor& descriptor,
+ span<const Address> addresses_to_skip);
Status GarbageCollectSector(SectorDescriptor* sector_to_gc,
- KeyDescriptor* key_in_progress = nullptr);
+ span<const Address> addresses_to_skip);
bool AddressInSector(const SectorDescriptor& sector, Address address) const {
const Address sector_base = SectorBaseAddress(§or);
@@ -469,11 +476,6 @@
// because SectorDescriptor* is the standard way to identify a sector.
SectorDescriptor* last_new_sector_;
uint32_t last_transaction_id_;
-
- // Working buffer is a general purpose buffer for operations (such as init or
- // relocate) to use for working space to remove the need to allocate temporary
- // space.
- std::array<std::byte, kWorkingBufferSizeBytes> working_buffer_;
};
template <size_t kMaxEntries,