Release absl::CordBuffer

absl::CordBuffer holds data for eventual inclusion within an existing
absl::Cord. CordBuffers are useful for building large Cords that may
require custom allocation of its associated memory, a pattern that is
common in zero-copy APIs.
PiperOrigin-RevId: 453212229
Change-Id: I6a8adc3a8d206691cb1b0001a9161e5080dd1c5f
diff --git a/CMake/AbseilDll.cmake b/CMake/AbseilDll.cmake
index 0b5f0a5..f81cb0a 100644
--- a/CMake/AbseilDll.cmake
+++ b/CMake/AbseilDll.cmake
@@ -194,9 +194,11 @@
   "strings/charconv.cc"
   "strings/charconv.h"
   "strings/cord.cc"
+  "strings/cord.h"
   "strings/cord_analysis.cc"
   "strings/cord_analysis.h"
-  "strings/cord.h"
+  "strings/cord_buffer.cc"
+  "strings/cord_buffer.h"
   "strings/escaping.cc"
   "strings/escaping.h"
   "strings/internal/charconv_bigint.cc"
diff --git a/absl/strings/BUILD.bazel b/absl/strings/BUILD.bazel
index e909216..81f5a58 100644
--- a/absl/strings/BUILD.bazel
+++ b/absl/strings/BUILD.bazel
@@ -415,9 +415,11 @@
         "cord.cc",
         "cord_analysis.cc",
         "cord_analysis.h",
+        "cord_buffer.cc",
     ],
     hdrs = [
         "cord.h",
+        "cord_buffer.h",
     ],
     copts = ABSL_DEFAULT_COPTS,
     deps = [
@@ -702,6 +704,22 @@
 )
 
 cc_test(
+    name = "cord_buffer_test",
+    size = "small",
+    srcs = ["cord_buffer_test.cc"],
+    copts = ABSL_TEST_COPTS,
+    visibility = ["//visibility:private"],
+    deps = [
+        ":cord",
+        ":cord_internal",
+        ":cord_rep_test_util",
+        "//absl/base:config",
+        "//absl/types:span",
+        "@com_google_googletest//:gtest_main",
+    ],
+)
+
+cc_test(
     name = "cord_test",
     size = "medium",
     srcs = ["cord_test.cc"],
diff --git a/absl/strings/CMakeLists.txt b/absl/strings/CMakeLists.txt
index a1b8d6e..bb07330 100644
--- a/absl/strings/CMakeLists.txt
+++ b/absl/strings/CMakeLists.txt
@@ -845,10 +845,12 @@
     cord
   HDRS
     "cord.h"
+    "cord_buffer.h"
   SRCS
     "cord.cc"
     "cord_analysis.cc"
     "cord_analysis.h"
+    "cord_buffer.cc"
   COPTS
     ${ABSL_DEFAULT_COPTS}
   DEPS
diff --git a/absl/strings/cord.cc b/absl/strings/cord.cc
index c6ec1ec..04dc7b9 100644
--- a/absl/strings/cord.cc
+++ b/absl/strings/cord.cc
@@ -34,6 +34,7 @@
 #include "absl/base/port.h"
 #include "absl/container/fixed_array.h"
 #include "absl/container/inlined_vector.h"
+#include "absl/strings/cord_buffer.h"
 #include "absl/strings/escaping.h"
 #include "absl/strings/internal/cord_data_edge.h"
 #include "absl/strings/internal/cord_internal.h"
@@ -520,6 +521,46 @@
   contents_.AppendTree(rep, CordzUpdateTracker::kAppendCord);
 }
 
+static CordRep::ExtractResult ExtractAppendBuffer(CordRep* rep,
+                                                  size_t min_capacity) {
+  switch (rep->tag) {
+    case cord_internal::BTREE:
+      return CordRepBtree::ExtractAppendBuffer(rep->btree(), min_capacity);
+    default:
+      if (rep->IsFlat() && rep->refcount.IsOne() &&
+          rep->flat()->Capacity() - rep->length >= min_capacity) {
+        return {nullptr, rep};
+      }
+      return {rep, nullptr};
+  }
+}
+
+static CordBuffer CreateAppendBuffer(InlineData& data, size_t capacity) {
+  // Watch out for overflow, people can ask for size_t::max().
+  const size_t size = data.inline_size();
+  capacity = (std::min)(std::numeric_limits<size_t>::max() - size, capacity);
+  CordBuffer buffer = CordBuffer::CreateWithDefaultLimit(size + capacity);
+  cord_internal::SmallMemmove(buffer.data(), data.as_chars(), size);
+  buffer.SetLength(size);
+  data = {};
+  return buffer;
+}
+
+CordBuffer Cord::GetAppendBufferSlowPath(size_t capacity, size_t min_capacity) {
+  auto constexpr method = CordzUpdateTracker::kGetAppendBuffer;
+  CordRep* tree = contents_.tree();
+  if (tree != nullptr) {
+    CordzUpdateScope scope(contents_.cordz_info(), method);
+    CordRep::ExtractResult result = ExtractAppendBuffer(tree, min_capacity);
+    if (result.extracted != nullptr) {
+      contents_.SetTreeOrEmpty(result.tree, scope);
+      return CordBuffer(result.extracted->flat());
+    }
+    return CordBuffer::CreateWithDefaultLimit(capacity);
+  }
+  return CreateAppendBuffer(contents_.data_, capacity);
+}
+
 void Cord::Append(const Cord& src) {
   AppendImpl(src);
 }
@@ -572,6 +613,33 @@
   contents_.PrependTree(rep, method);
 }
 
+void Cord::AppendPrecise(absl::string_view src, MethodIdentifier method) {
+  assert(!src.empty());
+  assert(src.size() <= cord_internal::kMaxFlatLength);
+  if (contents_.remaining_inline_capacity() >= src.size()) {
+    const size_t inline_length = contents_.inline_size();
+    memcpy(contents_.data_.as_chars() + inline_length, src.data(), src.size());
+    contents_.set_inline_size(inline_length + src.size());
+  } else {
+    contents_.AppendTree(CordRepFlat::Create(src), method);
+  }
+}
+
+void Cord::PrependPrecise(absl::string_view src, MethodIdentifier method) {
+  assert(!src.empty());
+  assert(src.size() <= cord_internal::kMaxFlatLength);
+  if (contents_.remaining_inline_capacity() >= src.size()) {
+    const size_t inline_length = contents_.inline_size();
+    char data[InlineRep::kMaxInline + 1] = {0};
+    memcpy(data, src.data(), src.size());
+    memcpy(data + src.size(), contents_.data(), inline_length);
+    memcpy(contents_.data_.as_chars(), data, InlineRep::kMaxInline + 1);
+    contents_.set_inline_size(inline_length + src.size());
+  } else {
+    contents_.PrependTree(CordRepFlat::Create(src), method);
+  }
+}
+
 template <typename T, Cord::EnableIfString<T>>
 inline void Cord::Prepend(T&& src) {
   if (src.size() <= kMaxBytesToCopy) {
diff --git a/absl/strings/cord.h b/absl/strings/cord.h
index 391b2c0..f378dc0 100644
--- a/absl/strings/cord.h
+++ b/absl/strings/cord.h
@@ -80,6 +80,7 @@
 #include "absl/functional/function_ref.h"
 #include "absl/meta/type_traits.h"
 #include "absl/strings/cord_analysis.h"
+#include "absl/strings/cord_buffer.h"
 #include "absl/strings/internal/cord_data_edge.h"
 #include "absl/strings/internal/cord_internal.h"
 #include "absl/strings/internal/cord_rep_btree.h"
@@ -244,6 +245,45 @@
   template <typename T, EnableIfString<T> = 0>
   void Append(T&& src);
 
+  // Appends `buffer` to this cord, unless `buffer` has a zero length in which
+  // case this method has no effect on this cord instance.
+  // This method is guaranteed to consume `buffer`.
+  void Append(CordBuffer buffer);
+
+  // Returns a CordBuffer, re-using potential existing capacity in this cord.
+  //
+  // Cord instances may have additional unused capacity in the last (or first)
+  // nodes of the underlying tree to facilitate amortized growth. This method
+  // allows applications to explicitly use this spare capacity if available,
+  // or create a new CordBuffer instance otherwise.
+  // If this cord has a final non-shared node with at least `min_capacity`
+  // available, then this method will return that buffer including its data
+  // contents. I.e.; the returned buffer will have a non-zero length, and
+  // a capacity of at least `buffer.length + min_capacity`. Otherwise, this
+  // method will return `CordBuffer::CreateWithDefaultLimit(capacity)`.
+  //
+  // Below an example of using GetAppendBuffer. Notice that in this example we
+  // use `GetAppendBuffer()` only on the first iteration. As we know nothing
+  // about any initial extra capacity in `cord`, we may be able to use the extra
+  // capacity. But as we add new buffers with fully utilized contents after that
+  // we avoid calling `GetAppendBuffer()` on subsequent iterations: while this
+  // works fine, it results in an unnecessary inspection of cord contents:
+  //
+  //   void AppendRandomDataToCord(absl::Cord &cord, size_t n) {
+  //     bool first = true;
+  //     while (n > 0) {
+  //       CordBuffer buffer = first ? cord.GetAppendBuffer(n)
+  //                                 : CordBuffer::CreateWithDefaultLimit(n);
+  //       absl::Span<char> data = buffer.available_up_to(n);
+  //       FillRandomValues(data.data(), data.size());
+  //       buffer.IncreaseLengthBy(data.size());
+  //       cord.Append(std::move(buffer));
+  //       n -= data.size();
+  //       first = false;
+  //     }
+  //   }
+  CordBuffer GetAppendBuffer(size_t capacity, size_t min_capacity = 16);
+
   // Cord::Prepend()
   //
   // Prepends data to the Cord, which may come from another Cord or other string
@@ -253,6 +293,11 @@
   template <typename T, EnableIfString<T> = 0>
   void Prepend(T&& src);
 
+  // Prepends `buffer` to this cord, unless `buffer` has a zero length in which
+  // case this method has no effect on this cord instance.
+  // This method is guaranteed to consume `buffer`.
+  void Prepend(CordBuffer buffer);
+
   // Cord::RemovePrefix()
   //
   // Removes the first `n` bytes of a Cord.
@@ -928,6 +973,15 @@
   template <typename C>
   void AppendImpl(C&& src);
 
+  // Appends / Prepends `src` to this instance, using precise sizing.
+  // This method does explicitly not attempt to use any spare capacity
+  // in any pending last added private owned flat.
+  // Requires `src` to be <= kMaxFlatLength.
+  void AppendPrecise(absl::string_view src, MethodIdentifier method);
+  void PrependPrecise(absl::string_view src, MethodIdentifier method);
+
+  CordBuffer GetAppendBufferSlowPath(size_t capacity, size_t min_capacity);
+
   // Prepends the provided data to this instance. `method` contains the public
   // API method for this action which is tracked for Cordz sampling purposes.
   void PrependArray(absl::string_view src, MethodIdentifier method);
@@ -1284,6 +1338,31 @@
   PrependArray(src, CordzUpdateTracker::kPrependString);
 }
 
+inline void Cord::Append(CordBuffer buffer) {
+  if (ABSL_PREDICT_FALSE(buffer.length() == 0)) return;
+  absl::string_view short_value;
+  if (CordRep* rep = buffer.ConsumeValue(short_value)) {
+    contents_.AppendTree(rep, CordzUpdateTracker::kAppendCordBuffer);
+  } else {
+    AppendPrecise(short_value, CordzUpdateTracker::kAppendCordBuffer);
+  }
+}
+
+inline void Cord::Prepend(CordBuffer buffer) {
+  if (ABSL_PREDICT_FALSE(buffer.length() == 0)) return;
+  absl::string_view short_value;
+  if (CordRep* rep = buffer.ConsumeValue(short_value)) {
+    contents_.PrependTree(rep, CordzUpdateTracker::kPrependCordBuffer);
+  } else {
+    PrependPrecise(short_value, CordzUpdateTracker::kPrependCordBuffer);
+  }
+}
+
+inline CordBuffer Cord::GetAppendBuffer(size_t capacity, size_t min_capacity) {
+  if (empty()) return CordBuffer::CreateWithDefaultLimit(capacity);
+  return GetAppendBufferSlowPath(capacity, min_capacity);
+}
+
 extern template void Cord::Append(std::string&& src);
 extern template void Cord::Prepend(std::string&& src);
 
diff --git a/absl/strings/cord_buffer.cc b/absl/strings/cord_buffer.cc
new file mode 100644
index 0000000..fd4045b
--- /dev/null
+++ b/absl/strings/cord_buffer.cc
@@ -0,0 +1,28 @@
+// Copyright 2022 The Abseil Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     https://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#include "absl/strings/cord_buffer.h"
+
+#include <cstddef>
+
+#include "absl/base/config.h"
+
+namespace absl {
+ABSL_NAMESPACE_BEGIN
+
+constexpr size_t CordBuffer::kDefaultLimit;
+constexpr size_t CordBuffer::kCustomLimit;
+
+ABSL_NAMESPACE_END
+}  // namespace absl
diff --git a/absl/strings/cord_buffer.h b/absl/strings/cord_buffer.h
new file mode 100644
index 0000000..4f4bdc0
--- /dev/null
+++ b/absl/strings/cord_buffer.h
@@ -0,0 +1,571 @@
+// Copyright 2021 The Abseil Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     https://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+// -----------------------------------------------------------------------------
+// File: cord_buffer.h
+// -----------------------------------------------------------------------------
+//
+// This file defines an `absl::CordBuffer` data structure to hold data for
+// eventual inclusion within an existing `Cord` data structure. Cord buffers are
+// useful for building large Cords that may require custom allocation of its
+// associated memory.
+//
+#ifndef ABSL_STRINGS_CORD_BUFFER_H_
+#define ABSL_STRINGS_CORD_BUFFER_H_
+
+#include <algorithm>
+#include <cassert>
+#include <cstddef>
+#include <cstdint>
+#include <memory>
+#include <utility>
+
+#include "absl/base/config.h"
+#include "absl/numeric/bits.h"
+#include "absl/strings/internal/cord_internal.h"
+#include "absl/strings/internal/cord_rep_flat.h"
+#include "absl/types/span.h"
+
+namespace absl {
+ABSL_NAMESPACE_BEGIN
+
+class Cord;
+class CordBufferTestPeer;
+
+// CordBuffer
+//
+// CordBuffer manages memory buffers for purposes such as zero-copy APIs as well
+// as applications building cords with large data requiring granular control
+// over the allocation and size of cord data. For example, a function creating
+// a cord of random data could use a CordBuffer as follows:
+//
+//   absl::Cord CreateRandomCord(size_t length) {
+//     absl::Cord cord;
+//     while (length > 0) {
+//       CordBuffer buffer = CordBuffer::CreateWithDefaultLimit(length);
+//       absl::Span<char> data = buffer.available_up_to(length);
+//       FillRandomValues(data.data(), data.size());
+//       buffer.IncreaseLengthBy(data.size());
+//       cord.Append(std::move(buffer));
+//       length -= data.size();
+//     }
+//     return cord;
+//   }
+//
+// CordBuffer instances are by default limited to a capacity of `kDefaultLimit`
+// bytes. `kDefaultLimit` is currently just under 4KiB, but this default may
+// change in the future and/or for specific architectures. The default limit is
+// aimed to provide a good trade-off between performance and memory overhead.
+// Smaller buffers typically incur more compute cost while larger buffers are
+// more CPU efficient but create significant memory overhead because of such
+// allocations being less granular. Using larger buffers may also increase the
+// risk of memory fragmentation.
+//
+// Applications create a buffer using one of the `CreateWithDefaultLimit()` or
+// `CreateWithCustomLimit()` methods. The returned instance will have a non-zero
+// capacity and a zero length. Applications use the `data()` method to set the
+// contents of the managed memory, and once done filling the buffer, use the
+// `IncreaseLengthBy()` or 'SetLength()' method to specify the length of the
+// initialized data before adding the buffer to a Cord.
+//
+// The `CreateWithCustomLimit()` method is intended for applications needing
+// larger buffers than the default memory limit, allowing the allocation of up
+// to a capacity of `kCustomLimit` bytes minus some minimum internal overhead.
+// The usage of `CreateWithCustomLimit()` should be limited to only those use
+// cases where the distribution of the input is relatively well known, and/or
+// where the trade-off between the efficiency gains outweigh the risk of memory
+// fragmentation. See the documentation for `CreateWithCustomLimit()` for more
+// information on using larger custom limits.
+//
+// The capacity of a `CordBuffer` returned by one of the `Create` methods may
+// be larger than the requested capacity due to rounding, alignment and
+// granularity of the memory allocator. Applications should use the `capacity`
+// method to obtain the effective capacity of the returned instance as
+// demonstrated in the provided example above.
+//
+// CordBuffer is a move-only class. All references into the managed memory are
+// invalidated when an instance is moved into either another CordBuffer instance
+// or a Cord. Writing to a location obtained by a previous call to `data()`
+// after an instance was moved will lead to undefined behavior.
+//
+// A `moved from` CordBuffer instance will have a valid, but empty state.
+// CordBuffer is thread compatible.
+class CordBuffer {
+ public:
+  // kDefaultLimit
+  //
+  // Default capacity limits of allocated CordBuffers.
+  // See the class comments for more information on allocation limits.
+  static constexpr size_t kDefaultLimit = cord_internal::kMaxFlatLength;
+
+  // kCustomLimit
+  //
+  // Maximum size for CreateWithCustomLimit() allocated buffers.
+  // Note that the effective capacity may be slightly less
+  // because of internal overhead of internal cord buffers.
+  static constexpr size_t kCustomLimit = 64U << 10;
+
+  // Constructors, Destructors and Assignment Operators
+
+  // Creates an empty CordBuffer.
+  CordBuffer() = default;
+
+  // Destroys this CordBuffer instance and, if not empty, releases any memory
+  // managed by this instance, invalidating previously returned references.
+  ~CordBuffer();
+
+  // CordBuffer is move-only
+  CordBuffer(CordBuffer&& rhs) noexcept;
+  CordBuffer& operator=(CordBuffer&&) noexcept;
+  CordBuffer(const CordBuffer&) = delete;
+  CordBuffer& operator=(const CordBuffer&) = delete;
+
+  // CordBuffer::MaximumPayload()
+  //
+  // Returns the guaranteed maximum payload for a CordBuffer returned by the
+  // `CreateWithDefaultLimit()` method. While small, each internal buffer inside
+  // a Cord incurs an overhead to manage the length, type and reference count
+  // for the buffer managed inside the cord tree. Applications can use this
+  // method to get approximate number of buffers required for a given byte
+  // size, etc.
+  //
+  // For example:
+  //   const size_t payload = absl::CordBuffer::MaximumPayload();
+  //   const size_t buffer_count = (total_size + payload - 1) / payload;
+  //   buffers.reserve(buffer_count);
+  static constexpr size_t MaximumPayload();
+
+  // Overload to the above `MaximumPayload()` except that it returns the
+  // maximum payload for a CordBuffer returned by the `CreateWithCustomLimit()`
+  // method given the provided `block_size`.
+  static constexpr size_t MaximumPayload(size_t block_size);
+
+  // CordBuffer::CreateWithDefaultLimit()
+  //
+  // Creates a CordBuffer instance of the desired `capacity`, capped at the
+  // default limit `kDefaultLimit`. The returned buffer has a guaranteed
+  // capacity of at least `min(kDefaultLimit, capacity)`. See the class comments
+  // for more information on buffer capacities and intended usage.
+  static CordBuffer CreateWithDefaultLimit(size_t capacity);
+
+
+  // CordBuffer::CreateWithCustomLimit()
+  //
+  // Creates a CordBuffer instance of the desired `capacity` rounded to an
+  // appropriate power of 2 size less than, or equal to `block_size`.
+  // Requires `block_size` to be a power of 2.
+  //
+  // If `capacity` is less than or equal to `kDefaultLimit`, then this method
+  // behaves identical to `CreateWithDefaultLimit`, which means that the caller
+  // is guaranteed to get a buffer of at least the requested capacity.
+  //
+  // If `capacity` is greater than or equal to `block_size`, then this method
+  // returns a buffer with an `allocated size` of `block_size` bytes. Otherwise,
+  // this methods returns a buffer with a suitable smaller power of 2 block size
+  // to satisfy the request. The actual size depends on a number of factors, and
+  // is typically (but not necessarily) the highest or second highest power of 2
+  // value less than or equal to `capacity`.
+  //
+  // The 'allocated size' includes a small amount of overhead required for
+  // internal state, which is currently 13 bytes on 64-bit platforms. For
+  // example: a buffer created with `block_size` and `capacity' set to 8KiB
+  // will have an allocated size of 8KiB, and an effective internal `capacity`
+  // of 8KiB - 13 = 8179 bytes.
+  //
+  // To demonstrate this in practice, let's assume we want to read data from
+  // somewhat larger files using approximately 64KiB buffers:
+  //
+  //   absl::Cord ReadFromFile(int fd, size_t n) {
+  //     absl::Cord cord;
+  //     while (n > 0) {
+  //       CordBuffer buffer = CordBuffer::CreateWithCustomLimit(64 << 10, n);
+  //       absl::Span<char> data = buffer.available_up_to(n);
+  //       ReadFileDataOrDie(fd, data.data(), data.size());
+  //       buffer.IncreaseLengthBy(data.size());
+  //       cord.Append(std::move(buffer));
+  //       n -= data.size();
+  //     }
+  //     return cord;
+  //   }
+  //
+  // If we'd use this function to read a file of 659KiB, we may get the
+  // following pattern of allocated cord buffer sizes:
+  //
+  //   CreateWithCustomLimit(64KiB, 674816) --> ~64KiB (65523)
+  //   CreateWithCustomLimit(64KiB, 674816) --> ~64KiB (65523)
+  //   ...
+  //   CreateWithCustomLimit(64KiB,  19586) --> ~16KiB (16371)
+  //   CreateWithCustomLimit(64KiB,   3215) -->   3215 (at least 3215)
+  //
+  // The reason the method returns a 16K buffer instead of a roughly 19K buffer
+  // is to reduce memory overhead and fragmentation risks. Using carefully
+  // chosen power of 2 values reduces the entropy of allocated memory sizes.
+  //
+  // Additionally, let's assume we'd use the above function on files that are
+  // generally smaller than 64K. If we'd use 'precise' sized buffers for such
+  // files, than we'd get a very wide distribution of allocated memory sizes
+  // rounded to 4K page sizes, and we'd end up with a lot of unused capacity.
+  //
+  // In general, application should only use custom sizes if the data they are
+  // consuming or storing is expected to be many times the chosen block size,
+  // and be based on objective data and performance metrics. For example, a
+  // compress function may work faster and consume less CPU when using larger
+  // buffers. Such an application should pick a size offering a reasonable
+  // trade-off between expected data size, compute savings with larger buffers,
+  // and the cost or fragmentation effect of larger buffers.
+  // Applications must pick a reasonable spot on that curve, and make sure their
+  // data meets their expectations in size distributions such as "mostly large".
+  static CordBuffer CreateWithCustomLimit(size_t block_size, size_t capacity);
+
+  // CordBuffer::available()
+  //
+  // Returns the span delineating the available capacity in this buffer
+  // which is defined as `{ data() + length(), capacity() - length() }`.
+  absl::Span<char> available();
+
+  // CordBuffer::available_up_to()
+  //
+  // Returns the span delineating the available capacity in this buffer limited
+  // to `size` bytes. This is equivalent to `available().subspan(0, size)`.
+  absl::Span<char> available_up_to(size_t size);
+
+  // CordBuffer::data()
+  //
+  // Returns a non-null reference to the data managed by this instance.
+  // Applications are allowed to write up to `capacity` bytes of instance data.
+  // CordBuffer data is uninitialized by default. Reading data from an instance
+  // that has not yet been initialized will lead to undefined behavior.
+  char* data();
+  const char* data() const;
+
+  // CordBuffer::length()
+  //
+  // Returns the length of this instance. The default length of a CordBuffer is
+  // 0, indicating an 'empty' CordBuffer. Applications must specify the length
+  // of the data in a CordBuffer before adding it to a Cord.
+  size_t length() const;
+
+  // CordBuffer::capacity()
+  //
+  // Returns the capacity of this instance. All instances have a non-zero
+  // capacity: default and `moved from` instances have a small internal buffer.
+  size_t capacity() const;
+
+  // CordBuffer::IncreaseLengthBy()
+  //
+  // Increases the length of this buffer by the specified 'n' bytes.
+  // Applications must make sure all data in this buffer up to the new length
+  // has been initialized before adding a CordBuffer to a Cord: failure to do so
+  // will lead to undefined behavior.  Requires `length() + n <= capacity()`.
+  // Typically, applications will use 'available_up_to()` to get a span of the
+  // desired capacity, and use `span.size()` to increase the length as in:
+  //   absl::Span<char> span = buffer.available_up_to(desired);
+  //   buffer.IncreaseLengthBy(span.size());
+  //   memcpy(span.data(), src, span.size());
+  //   etc...
+  void IncreaseLengthBy(size_t n);
+
+  // CordBuffer::SetLength()
+  //
+  // Sets the data length of this instance. Applications must make sure all data
+  // of the specified length has been initialized before adding a CordBuffer to
+  // a Cord: failure to do so will lead to undefined behavior.
+  // Setting the length to a small value or zero does not release any memory
+  // held by this CordBuffer instance. Requires `length <= capacity()`.
+  // Applications should preferably use the `IncreaseLengthBy()` method above
+  // in combination with the 'available()` or `available_up_to()` methods.
+  void SetLength(size_t length);
+
+ private:
+  // Make sure we don't accidentally over promise.
+  static_assert(kCustomLimit <= cord_internal::kMaxLargeFlatSize, "");
+
+  // Assume the cost of an 'uprounded' allocation to CeilPow2(size) versus
+  // the cost of allocating at least 1 extra flat <= 4KB:
+  // - Flat overhead = 13 bytes
+  // - Btree amortized cost / node =~ 13 bytes
+  // - 64 byte granularity of tcmalloc at 4K =~ 32 byte average
+  // CPU cost and efficiency requires we should at least 'save' something by
+  // splitting, as a poor man's measure, we say the slop needs to be
+  // at least double the cost offset to make it worth splitting: ~128 bytes.
+  static constexpr size_t kMaxPageSlop = 128;
+
+  // Overhead for allocation a flat.
+  static constexpr size_t kOverhead = cord_internal::kFlatOverhead;
+
+  using CordRepFlat = cord_internal::CordRepFlat;
+
+  // `Rep` is the internal data representation of a CordBuffer. The internal
+  // representation has an internal small size optimization similar to
+  // std::string (SSO).
+  struct Rep {
+    // Inline SSO size of a CordBuffer
+    static constexpr size_t kInlineCapacity = sizeof(intptr_t) * 2 - 1;
+
+    // Creates a default instance with kInlineCapacity.
+    Rep() : short_rep{} {}
+
+    // Creates an instance managing an allocated non zero CordRep.
+    explicit Rep(cord_internal::CordRepFlat* rep) : long_rep{rep} {
+      assert(rep != nullptr);
+    }
+
+    // Returns true if this instance manages the SSO internal buffer.
+    bool is_short() const {
+      constexpr size_t offset = offsetof(Short, raw_size);
+      return (reinterpret_cast<const char*>(this)[offset] & 1) != 0;
+    }
+
+    // Returns the available area of the internal SSO data
+    absl::Span<char> short_available() {
+      assert(is_short());
+      const size_t length = (short_rep.raw_size >> 1);
+      return absl::Span<char>(short_rep.data + length,
+                              kInlineCapacity - length);
+    }
+
+    // Returns the available area of the internal SSO data
+    absl::Span<char> long_available() {
+      assert(!is_short());
+      const size_t length = long_rep.rep->length;
+      return absl::Span<char>(long_rep.rep->Data() + length,
+                              long_rep.rep->Capacity() - length);
+    }
+
+    // Returns the length of the internal SSO data.
+    size_t short_length() const {
+      assert(is_short());
+      return short_rep.raw_size >> 1;
+    }
+
+    // Sets the length of the internal SSO data.
+    // Disregards any previously set CordRep instance.
+    void set_short_length(size_t length) {
+      short_rep.raw_size = static_cast<char>((length << 1) + 1);
+    }
+
+    // Adds `n` to the current short length.
+    void add_short_length(size_t n) {
+      assert(is_short());
+      short_rep.raw_size += static_cast<char>(n << 1);
+    }
+
+    // Returns reference to the internal SSO data buffer.
+    char* data() {
+      assert(is_short());
+      return short_rep.data;
+    }
+    const char* data() const {
+      assert(is_short());
+      return short_rep.data;
+    }
+
+    // Returns a pointer the external CordRep managed by this instance.
+    cord_internal::CordRepFlat* rep() const {
+      assert(!is_short());
+      return long_rep.rep;
+    }
+
+    // The internal representation takes advantage of the fact that allocated
+    // memory is always on an even address, and uses the least significant bit
+    // of the first or last byte (depending on endianness) as the inline size
+    // indicator overlapping with the least significant byte of the CordRep*.
+#if defined(ABSL_IS_BIG_ENDIAN)
+    struct Long {
+      explicit Long(cord_internal::CordRepFlat* rep_arg) : rep(rep_arg) {}
+      void* padding;
+      cord_internal::CordRepFlat* rep;
+    };
+    struct Short {
+      char data[sizeof(Long) - 1];
+      char raw_size = 1;
+    };
+#else
+    struct Long {
+      explicit Long(cord_internal::CordRepFlat* rep_arg) : rep(rep_arg) {}
+      cord_internal::CordRepFlat* rep;
+      void* padding;
+    };
+    struct Short {
+      char raw_size = 1;
+      char data[sizeof(Long) - 1];
+    };
+#endif
+
+    union {
+      Long long_rep;
+      Short short_rep;
+    };
+  };
+
+  // Power2 functions
+  static bool IsPow2(size_t size) { return absl::has_single_bit(size); }
+  static size_t Log2Floor(size_t size) { return absl::bit_width(size) - 1; }
+  static size_t Log2Ceil(size_t size) { return absl::bit_width(size - 1); }
+
+  // Implementation of `CreateWithCustomLimit()`.
+  // This implementation allows for future memory allocation hints to
+  // be passed down into the CordRepFlat allocation function.
+  template <typename... AllocationHints>
+  static CordBuffer CreateWithCustomLimitImpl(size_t block_size,
+                                              size_t capacity,
+                                              AllocationHints... hints);
+
+  // Consumes the value contained in this instance and resets the instance.
+  // This method returns a non-null Cordrep* if the current instances manages a
+  // CordRep*, and resets the instance to an empty SSO instance. If the current
+  // instance is an SSO instance, then this method returns nullptr and sets
+  // `short_value` to the inlined data value. In either case, the current
+  // instance length is reset to zero.
+  // This method is intended to be used by Cord internal functions only.
+  cord_internal::CordRep* ConsumeValue(absl::string_view& short_value) {
+    cord_internal::CordRep* rep = nullptr;
+    if (rep_.is_short()) {
+      short_value = absl::string_view(rep_.data(), rep_.short_length());
+    } else {
+      rep = rep_.rep();
+    }
+    rep_.set_short_length(0);
+    return rep;
+  }
+
+  // Internal constructor.
+  explicit CordBuffer(cord_internal::CordRepFlat* rep) : rep_(rep) {
+    assert(rep != nullptr);
+  }
+
+  Rep rep_;
+
+  friend class Cord;
+  friend class CordBufferTestPeer;
+};
+
+inline constexpr size_t CordBuffer::MaximumPayload() {
+  return cord_internal::kMaxFlatLength;
+}
+
+inline constexpr size_t CordBuffer::MaximumPayload(size_t block_size) {
+  // TODO(absl-team): Use std::min when C++11 support is dropped.
+  return (kCustomLimit < block_size ? kCustomLimit : block_size) -
+         cord_internal::kFlatOverhead;
+}
+
+inline CordBuffer CordBuffer::CreateWithDefaultLimit(size_t capacity) {
+  if (capacity > Rep::kInlineCapacity) {
+    auto* rep = cord_internal::CordRepFlat::New(capacity);
+    rep->length = 0;
+    return CordBuffer(rep);
+  }
+  return CordBuffer();
+}
+
+template <typename... AllocationHints>
+inline CordBuffer CordBuffer::CreateWithCustomLimitImpl(
+    size_t block_size, size_t capacity, AllocationHints... hints) {
+  assert(IsPow2(block_size));
+  capacity = (std::min)(capacity, kCustomLimit);
+  block_size = (std::min)(block_size, kCustomLimit);
+  if (capacity + kOverhead >= block_size) {
+    capacity = block_size;
+  } else if (capacity <= kDefaultLimit) {
+    capacity = capacity + kOverhead;
+  } else if (!IsPow2(capacity)) {
+    // Check if rounded up to next power 2 is a good enough fit
+    // with limited waste making it an acceptable direct fit.
+    const size_t rounded_up = size_t{1} << Log2Ceil(capacity);
+    const size_t slop = rounded_up - capacity;
+    if (slop >= kOverhead && slop <= kMaxPageSlop + kOverhead) {
+      capacity = rounded_up;
+    } else {
+      // Round down to highest power of 2 <= capacity.
+      // Consider a more aggressive step down if that may reduce the
+      // risk of fragmentation where 'people are holding it wrong'.
+      const size_t rounded_down = size_t{1} << Log2Floor(capacity);
+      capacity = rounded_down;
+    }
+  }
+  const size_t length = capacity - kOverhead;
+  auto* rep = CordRepFlat::New(CordRepFlat::Large(), length, hints...);
+  rep->length = 0;
+  return CordBuffer(rep);
+}
+
+inline CordBuffer CordBuffer::CreateWithCustomLimit(size_t block_size,
+                                                    size_t capacity) {
+  return CreateWithCustomLimitImpl(block_size, capacity);
+}
+
+inline CordBuffer::~CordBuffer() {
+  if (!rep_.is_short()) {
+    cord_internal::CordRepFlat::Delete(rep_.rep());
+  }
+}
+
+inline CordBuffer::CordBuffer(CordBuffer&& rhs) noexcept : rep_(rhs.rep_) {
+  rhs.rep_.set_short_length(0);
+}
+
+inline CordBuffer& CordBuffer::operator=(CordBuffer&& rhs) noexcept {
+  if (!rep_.is_short()) cord_internal::CordRepFlat::Delete(rep_.rep());
+  rep_ = rhs.rep_;
+  rhs.rep_.set_short_length(0);
+  return *this;
+}
+
+inline absl::Span<char> CordBuffer::available() {
+  return rep_.is_short() ? rep_.short_available() : rep_.long_available();
+}
+
+inline absl::Span<char> CordBuffer::available_up_to(size_t size) {
+  return available().subspan(0, size);
+}
+
+inline char* CordBuffer::data() {
+  return rep_.is_short() ? rep_.data() : rep_.rep()->Data();
+}
+
+inline const char* CordBuffer::data() const {
+  return rep_.is_short() ? rep_.data() : rep_.rep()->Data();
+}
+
+inline size_t CordBuffer::capacity() const {
+  return rep_.is_short() ? Rep::kInlineCapacity : rep_.rep()->Capacity();
+}
+
+inline size_t CordBuffer::length() const {
+  return rep_.is_short() ? rep_.short_length() : rep_.rep()->length;
+}
+
+inline void CordBuffer::SetLength(size_t length) {
+  assert(length <= capacity());
+  if (rep_.is_short()) {
+    rep_.set_short_length(length);
+  } else {
+    rep_.rep()->length = length;
+  }
+}
+
+inline void CordBuffer::IncreaseLengthBy(size_t n) {
+  assert(n <= capacity() && length() + n <= capacity());
+  if (rep_.is_short()) {
+    rep_.add_short_length(n);
+  } else {
+    rep_.rep()->length += n;
+  }
+}
+
+ABSL_NAMESPACE_END
+}  // namespace absl
+
+#endif  // ABSL_STRINGS_CORD_BUFFER_H_
diff --git a/absl/strings/cord_buffer_test.cc b/absl/strings/cord_buffer_test.cc
new file mode 100644
index 0000000..5c7437a
--- /dev/null
+++ b/absl/strings/cord_buffer_test.cc
@@ -0,0 +1,320 @@
+// Copyright 2021 The Abseil Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     https://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#include "absl/strings/cord_buffer.h"
+
+
+#include <algorithm>
+#include <climits>
+#include <cstring>
+#include <string>
+#include <utility>
+
+#include "gmock/gmock.h"
+#include "gtest/gtest.h"
+#include "absl/base/config.h"
+#include "absl/strings/internal/cord_rep_flat.h"
+#include "absl/strings/internal/cord_rep_test_util.h"
+#include "absl/types/span.h"
+
+using testing::Eq;
+using testing::Ge;
+using testing::Le;
+using testing::Ne;
+
+namespace absl {
+ABSL_NAMESPACE_BEGIN
+
+class CordBufferTestPeer {
+ public:
+  static cord_internal::CordRep* ConsumeValue(CordBuffer& buffer,
+                                              absl::string_view& short_value) {
+    return buffer.ConsumeValue(short_value);
+  }
+};
+
+namespace {
+
+using ::absl::cordrep_testing::CordToString;
+
+constexpr size_t kInlinedSize = sizeof(CordBuffer) - 1;
+constexpr size_t kDefaultLimit = CordBuffer::kDefaultLimit;
+constexpr size_t kCustomLimit = CordBuffer::kCustomLimit;
+constexpr size_t kMaxFlatSize = cord_internal::kMaxFlatSize;
+constexpr size_t kMaxFlatLength = cord_internal::kMaxFlatLength;
+constexpr size_t kFlatOverhead = cord_internal::kFlatOverhead;
+
+constexpr size_t k8KiB = 8 << 10;
+constexpr size_t k16KiB = 16 << 10;
+constexpr size_t k64KiB = 64 << 10;
+constexpr size_t k1MB = 1 << 20;
+
+class CordBufferTest : public testing::TestWithParam<size_t> {};
+
+INSTANTIATE_TEST_SUITE_P(MediumSize, CordBufferTest,
+                         testing::Values(1, kInlinedSize - 1, kInlinedSize,
+                                         kInlinedSize + 1, kDefaultLimit - 1,
+                                         kDefaultLimit));
+
+TEST_P(CordBufferTest, MaximumPayload) {
+  EXPECT_THAT(CordBuffer::MaximumPayload(), Eq(kMaxFlatLength));
+  EXPECT_THAT(CordBuffer::MaximumPayload(512), Eq(512 - kFlatOverhead));
+  EXPECT_THAT(CordBuffer::MaximumPayload(k64KiB), Eq(k64KiB - kFlatOverhead));
+  EXPECT_THAT(CordBuffer::MaximumPayload(k1MB), Eq(k64KiB - kFlatOverhead));
+}
+
+TEST(CordBufferTest, ConstructDefault) {
+  CordBuffer buffer;
+  EXPECT_THAT(buffer.capacity(), Eq(sizeof(CordBuffer) - 1));
+  EXPECT_THAT(buffer.length(), Eq(0));
+  EXPECT_THAT(buffer.data(), Ne(nullptr));
+  EXPECT_THAT(buffer.available().data(), Eq(buffer.data()));
+  EXPECT_THAT(buffer.available().size(), Eq(buffer.capacity()));
+  memset(buffer.data(), 0xCD, buffer.capacity());
+}
+
+TEST(CordBufferTest, CreateSsoWithDefaultLimit) {
+  CordBuffer buffer = CordBuffer::CreateWithDefaultLimit(3);
+  EXPECT_THAT(buffer.capacity(), Ge(3));
+  EXPECT_THAT(buffer.capacity(), Le(sizeof(CordBuffer)));
+  EXPECT_THAT(buffer.length(), Eq(0));
+  memset(buffer.data(), 0xCD, buffer.capacity());
+
+  memcpy(buffer.data(), "Abc", 3);
+  buffer.SetLength(3);
+  EXPECT_THAT(buffer.length(), Eq(3));
+  absl::string_view short_value;
+  EXPECT_THAT(CordBufferTestPeer::ConsumeValue(buffer, short_value),
+              Eq(nullptr));
+  EXPECT_THAT(absl::string_view(buffer.data(), 3), Eq("Abc"));
+  EXPECT_THAT(short_value, Eq("Abc"));
+}
+
+TEST_P(CordBufferTest, Available) {
+  const size_t requested = GetParam();
+  CordBuffer buffer = CordBuffer::CreateWithDefaultLimit(requested);
+  EXPECT_THAT(buffer.available().data(), Eq(buffer.data()));
+  EXPECT_THAT(buffer.available().size(), Eq(buffer.capacity()));
+
+  buffer.SetLength(2);
+  EXPECT_THAT(buffer.available().data(), Eq(buffer.data() + 2));
+  EXPECT_THAT(buffer.available().size(), Eq(buffer.capacity() - 2));
+}
+
+TEST_P(CordBufferTest, IncreaseLengthBy) {
+  const size_t requested = GetParam();
+  CordBuffer buffer = CordBuffer::CreateWithDefaultLimit(requested);
+  buffer.IncreaseLengthBy(2);
+  EXPECT_THAT(buffer.length(), Eq(2));
+  buffer.IncreaseLengthBy(5);
+  EXPECT_THAT(buffer.length(), Eq(7));
+}
+
+TEST_P(CordBufferTest, AvailableUpTo) {
+  const size_t requested = GetParam();
+  CordBuffer buffer = CordBuffer::CreateWithDefaultLimit(requested);
+  size_t expected_up_to = std::min<size_t>(3, buffer.capacity());
+  EXPECT_THAT(buffer.available_up_to(3).data(), Eq(buffer.data()));
+  EXPECT_THAT(buffer.available_up_to(3).size(), Eq(expected_up_to));
+
+  buffer.SetLength(2);
+  expected_up_to = std::min<size_t>(3, buffer.capacity() - 2);
+  EXPECT_THAT(buffer.available_up_to(3).data(), Eq(buffer.data() + 2));
+  EXPECT_THAT(buffer.available_up_to(3).size(), Eq(expected_up_to));
+}
+
+// Returns the maximum capacity for a given block_size and requested size.
+size_t MaxCapacityFor(size_t block_size, size_t requested) {
+  requested = (std::min)(requested, cord_internal::kMaxLargeFlatSize);
+  // Maximum returned size is always capped at block_size - kFlatOverhead.
+  return block_size - kFlatOverhead;
+}
+
+TEST_P(CordBufferTest, CreateWithDefaultLimit) {
+  const size_t requested = GetParam();
+  CordBuffer buffer = CordBuffer::CreateWithDefaultLimit(requested);
+  EXPECT_THAT(buffer.capacity(), Ge(requested));
+  EXPECT_THAT(buffer.capacity(), Le(MaxCapacityFor(kMaxFlatSize, requested)));
+  EXPECT_THAT(buffer.length(), Eq(0));
+
+  memset(buffer.data(), 0xCD, buffer.capacity());
+
+  std::string data(requested - 1, 'x');
+  memcpy(buffer.data(), data.c_str(), requested);
+  buffer.SetLength(requested);
+
+  EXPECT_THAT(buffer.length(), Eq(requested));
+  EXPECT_THAT(absl::string_view(buffer.data()), Eq(data));
+}
+
+TEST(CordBufferTest, CreateWithDefaultLimitAskingFor2GB) {
+  constexpr size_t k2GiB = 1U << 31;
+  CordBuffer buffer = CordBuffer::CreateWithDefaultLimit(k2GiB);
+  // Expect to never be awarded more than a reasonable memory size, even in
+  // cases where a (debug) memory allocator may grant us somewhat more memory
+  // than `kDefaultLimit` which should be no more than `2 * kDefaultLimit`
+  EXPECT_THAT(buffer.capacity(), Le(2 * CordBuffer::kDefaultLimit));
+  EXPECT_THAT(buffer.length(), Eq(0));
+  EXPECT_THAT(buffer.data(), Ne(nullptr));
+  memset(buffer.data(), 0xCD, buffer.capacity());
+}
+
+TEST_P(CordBufferTest, MoveConstruct) {
+  const size_t requested = GetParam();
+  CordBuffer from = CordBuffer::CreateWithDefaultLimit(requested);
+  const size_t capacity = from.capacity();
+  memcpy(from.data(), "Abc", 4);
+  from.SetLength(4);
+
+  CordBuffer to(std::move(from));
+  EXPECT_THAT(to.capacity(), Eq(capacity));
+  EXPECT_THAT(to.length(), Eq(4));
+  EXPECT_THAT(absl::string_view(to.data()), Eq("Abc"));
+
+  EXPECT_THAT(from.length(), Eq(0));  // NOLINT
+}
+
+TEST_P(CordBufferTest, MoveAssign) {
+  const size_t requested = GetParam();
+  CordBuffer from = CordBuffer::CreateWithDefaultLimit(requested);
+  const size_t capacity = from.capacity();
+  memcpy(from.data(), "Abc", 4);
+  from.SetLength(4);
+
+  CordBuffer to;
+  to = std::move(from);
+  EXPECT_THAT(to.capacity(), Eq(capacity));
+  EXPECT_THAT(to.length(), Eq(4));
+  EXPECT_THAT(absl::string_view(to.data()), Eq("Abc"));
+
+  EXPECT_THAT(from.length(), Eq(0));  // NOLINT
+}
+
+TEST_P(CordBufferTest, ConsumeValue) {
+  const size_t requested = GetParam();
+  CordBuffer buffer = CordBuffer::CreateWithDefaultLimit(requested);
+  memcpy(buffer.data(), "Abc", 4);
+  buffer.SetLength(3);
+
+  absl::string_view short_value;
+  if (cord_internal::CordRep* rep =
+          CordBufferTestPeer::ConsumeValue(buffer, short_value)) {
+    EXPECT_THAT(CordToString(rep), Eq("Abc"));
+    cord_internal::CordRep::Unref(rep);
+  } else {
+    EXPECT_THAT(short_value, Eq("Abc"));
+  }
+  EXPECT_THAT(buffer.length(), Eq(0));
+}
+
+TEST_P(CordBufferTest, CreateWithCustomLimitWithinDefaultLimit) {
+  const size_t requested = GetParam();
+  CordBuffer buffer =
+      CordBuffer::CreateWithCustomLimit(kMaxFlatSize, requested);
+  EXPECT_THAT(buffer.capacity(), Ge(requested));
+  EXPECT_THAT(buffer.capacity(), Le(MaxCapacityFor(kMaxFlatSize, requested)));
+  EXPECT_THAT(buffer.length(), Eq(0));
+
+  memset(buffer.data(), 0xCD, buffer.capacity());
+
+  std::string data(requested - 1, 'x');
+  memcpy(buffer.data(), data.c_str(), requested);
+  buffer.SetLength(requested);
+
+  EXPECT_THAT(buffer.length(), Eq(requested));
+  EXPECT_THAT(absl::string_view(buffer.data()), Eq(data));
+}
+
+TEST(CordLargeBufferTest, CreateAtOrBelowDefaultLimit) {
+  CordBuffer buffer = CordBuffer::CreateWithCustomLimit(k64KiB, kDefaultLimit);
+  EXPECT_THAT(buffer.capacity(), Ge(kDefaultLimit));
+  EXPECT_THAT(buffer.capacity(),
+              Le(MaxCapacityFor(kMaxFlatSize, kDefaultLimit)));
+
+  buffer = CordBuffer::CreateWithCustomLimit(k64KiB, 3178);
+  EXPECT_THAT(buffer.capacity(), Ge(3178));
+}
+
+TEST(CordLargeBufferTest, CreateWithCustomLimit) {
+  ASSERT_THAT((kMaxFlatSize & (kMaxFlatSize - 1)) == 0, "Must be power of 2");
+
+  for (size_t size = kMaxFlatSize; size <= kCustomLimit; size *= 2) {
+    CordBuffer buffer = CordBuffer::CreateWithCustomLimit(size, size);
+    size_t expected = size - kFlatOverhead;
+    ASSERT_THAT(buffer.capacity(), Ge(expected));
+    EXPECT_THAT(buffer.capacity(), Le(MaxCapacityFor(size, expected)));
+  }
+}
+
+TEST(CordLargeBufferTest, CreateWithTooLargeLimit) {
+  CordBuffer buffer = CordBuffer::CreateWithCustomLimit(k64KiB, k1MB);
+  ASSERT_THAT(buffer.capacity(), Ge(k64KiB - kFlatOverhead));
+  EXPECT_THAT(buffer.capacity(), Le(MaxCapacityFor(k64KiB, k1MB)));
+}
+
+TEST(CordLargeBufferTest, CreateWithHugeValueForOverFlowHardening) {
+  for (size_t dist_from_max = 0; dist_from_max <= 32; ++dist_from_max) {
+    size_t capacity = std::numeric_limits<size_t>::max() - dist_from_max;
+
+    CordBuffer buffer = CordBuffer::CreateWithDefaultLimit(capacity);
+    ASSERT_THAT(buffer.capacity(), Ge(kDefaultLimit));
+    EXPECT_THAT(buffer.capacity(), Le(MaxCapacityFor(kMaxFlatSize, capacity)));
+
+    for (size_t limit = kMaxFlatSize; limit <= kCustomLimit; limit *= 2) {
+      CordBuffer buffer = CordBuffer::CreateWithCustomLimit(limit, capacity);
+      ASSERT_THAT(buffer.capacity(), Ge(limit - kFlatOverhead));
+      EXPECT_THAT(buffer.capacity(), Le(MaxCapacityFor(limit, capacity)));
+    }
+  }
+}
+
+TEST(CordLargeBufferTest, CreateWithSmallLimit) {
+  CordBuffer buffer = CordBuffer::CreateWithCustomLimit(512, 1024);
+  ASSERT_THAT(buffer.capacity(), Ge(512 - kFlatOverhead));
+  EXPECT_THAT(buffer.capacity(), Le(MaxCapacityFor(512, 1024)));
+
+  // Ask for precise block size, should return size - kOverhead
+  buffer = CordBuffer::CreateWithCustomLimit(512, 512);
+  ASSERT_THAT(buffer.capacity(), Ge(512 - kFlatOverhead));
+  EXPECT_THAT(buffer.capacity(), Le(MaxCapacityFor(512, 512)));
+
+  // Corner case: 511 < block_size, but 511 + kOverhead is above
+  buffer = CordBuffer::CreateWithCustomLimit(512, 511);
+  ASSERT_THAT(buffer.capacity(), Ge(512 - kFlatOverhead));
+  EXPECT_THAT(buffer.capacity(), Le(MaxCapacityFor(512, 511)));
+
+  // Corner case: 498 + kOverhead < block_size
+  buffer = CordBuffer::CreateWithCustomLimit(512, 498);
+  ASSERT_THAT(buffer.capacity(), Ge(512 - kFlatOverhead));
+  EXPECT_THAT(buffer.capacity(), Le(MaxCapacityFor(512, 498)));
+}
+
+TEST(CordLargeBufferTest, CreateWasteFull) {
+  // 15 KiB gets rounded down to next pow2 value.
+  const size_t requested = (15 << 10);
+  CordBuffer buffer = CordBuffer::CreateWithCustomLimit(k16KiB, requested);
+  ASSERT_THAT(buffer.capacity(), Ge(k8KiB - kFlatOverhead));
+  EXPECT_THAT(buffer.capacity(), Le(MaxCapacityFor(k8KiB, requested)));
+}
+
+TEST(CordLargeBufferTest, CreateSmallSlop) {
+  const size_t requested = k16KiB - 2 * kFlatOverhead;
+  CordBuffer buffer = CordBuffer::CreateWithCustomLimit(k16KiB, requested);
+  ASSERT_THAT(buffer.capacity(), Ge(k16KiB - kFlatOverhead));
+  EXPECT_THAT(buffer.capacity(), Le(MaxCapacityFor(k16KiB, requested)));
+}
+
+}  // namespace
+ABSL_NAMESPACE_END
+}  // namespace absl
diff --git a/absl/strings/cord_test.cc b/absl/strings/cord_test.cc
index 4261c99..0862f69 100644
--- a/absl/strings/cord_test.cc
+++ b/absl/strings/cord_test.cc
@@ -597,6 +597,284 @@
                                 "copying ", "to ", "a ", "string."})));
 }
 
+TEST_P(CordTest, AppendEmptyBuffer) {
+  absl::Cord cord;
+  cord.Append(absl::CordBuffer());
+  cord.Append(absl::CordBuffer::CreateWithDefaultLimit(2000));
+}
+
+TEST_P(CordTest, AppendEmptyBufferToFlat) {
+  absl::Cord cord(std::string(2000, 'x'));
+  cord.Append(absl::CordBuffer());
+  cord.Append(absl::CordBuffer::CreateWithDefaultLimit(2000));
+}
+
+TEST_P(CordTest, AppendEmptyBufferToTree) {
+  absl::Cord cord(std::string(2000, 'x'));
+  cord.Append(std::string(2000, 'y'));
+  cord.Append(absl::CordBuffer());
+  cord.Append(absl::CordBuffer::CreateWithDefaultLimit(2000));
+}
+
+TEST_P(CordTest, AppendSmallBuffer) {
+  absl::Cord cord;
+  absl::CordBuffer buffer = absl::CordBuffer::CreateWithDefaultLimit(3);
+  ASSERT_THAT(buffer.capacity(), ::testing::Le(15));
+  memcpy(buffer.data(), "Abc", 3);
+  buffer.SetLength(3);
+  cord.Append(std::move(buffer));
+  EXPECT_EQ(buffer.length(), 0);    // NOLINT
+  EXPECT_GT(buffer.capacity(), 0);  // NOLINT
+
+  buffer = absl::CordBuffer::CreateWithDefaultLimit(3);
+  memcpy(buffer.data(), "defgh", 5);
+  buffer.SetLength(5);
+  cord.Append(std::move(buffer));
+  EXPECT_EQ(buffer.length(), 0);    // NOLINT
+  EXPECT_GT(buffer.capacity(), 0);  // NOLINT
+
+  EXPECT_THAT(cord.Chunks(), ::testing::ElementsAre("Abcdefgh"));
+}
+
+TEST_P(CordTest, AppendAndPrependBufferArePrecise) {
+  // Create a cord large enough to force 40KB flats.
+  std::string test_data(absl::cord_internal::kMaxFlatLength * 10, 'x');
+  absl::Cord cord1(test_data);
+  absl::Cord cord2(test_data);
+  const size_t size1 = cord1.EstimatedMemoryUsage();
+  const size_t size2 = cord2.EstimatedMemoryUsage();
+
+  absl::CordBuffer buffer = absl::CordBuffer::CreateWithDefaultLimit(3);
+  memcpy(buffer.data(), "Abc", 3);
+  buffer.SetLength(3);
+  cord1.Append(std::move(buffer));
+
+  buffer = absl::CordBuffer::CreateWithDefaultLimit(3);
+  memcpy(buffer.data(), "Abc", 3);
+  buffer.SetLength(3);
+  cord2.Prepend(std::move(buffer));
+
+#ifndef NDEBUG
+  // Allow 32 bytes new CordRepFlat, and 128 bytes for 'glue nodes'
+  constexpr size_t kMaxDelta = 128 + 32;
+#else
+  // Allow 256 bytes extra for 'allocation debug overhead'
+  constexpr size_t kMaxDelta = 128 + 32 + 256;
+#endif
+
+  EXPECT_LE(cord1.EstimatedMemoryUsage() - size1, kMaxDelta);
+  EXPECT_LE(cord2.EstimatedMemoryUsage() - size2, kMaxDelta);
+
+  EXPECT_EQ(cord1, absl::StrCat(test_data, "Abc"));
+  EXPECT_EQ(cord2, absl::StrCat("Abc", test_data));
+}
+
+TEST_P(CordTest, PrependSmallBuffer) {
+  absl::Cord cord;
+  absl::CordBuffer buffer = absl::CordBuffer::CreateWithDefaultLimit(3);
+  ASSERT_THAT(buffer.capacity(), ::testing::Le(15));
+  memcpy(buffer.data(), "Abc", 3);
+  buffer.SetLength(3);
+  cord.Prepend(std::move(buffer));
+  EXPECT_EQ(buffer.length(), 0);    // NOLINT
+  EXPECT_GT(buffer.capacity(), 0);  // NOLINT
+
+  buffer = absl::CordBuffer::CreateWithDefaultLimit(3);
+  memcpy(buffer.data(), "defgh", 5);
+  buffer.SetLength(5);
+  cord.Prepend(std::move(buffer));
+  EXPECT_EQ(buffer.length(), 0);    // NOLINT
+  EXPECT_GT(buffer.capacity(), 0);  // NOLINT
+
+  EXPECT_THAT(cord.Chunks(), ::testing::ElementsAre("defghAbc"));
+}
+
+TEST_P(CordTest, AppendLargeBuffer) {
+  absl::Cord cord;
+
+  std::string s1(700, '1');
+  absl::CordBuffer buffer = absl::CordBuffer::CreateWithDefaultLimit(s1.size());
+  memcpy(buffer.data(), s1.data(), s1.size());
+  buffer.SetLength(s1.size());
+  cord.Append(std::move(buffer));
+  EXPECT_EQ(buffer.length(), 0);    // NOLINT
+  EXPECT_GT(buffer.capacity(), 0);  // NOLINT
+
+  std::string s2(1000, '2');
+  buffer = absl::CordBuffer::CreateWithDefaultLimit(s2.size());
+  memcpy(buffer.data(), s2.data(), s2.size());
+  buffer.SetLength(s2.size());
+  cord.Append(std::move(buffer));
+  EXPECT_EQ(buffer.length(), 0);    // NOLINT
+  EXPECT_GT(buffer.capacity(), 0);  // NOLINT
+
+  EXPECT_THAT(cord.Chunks(), ::testing::ElementsAre(s1, s2));
+}
+
+TEST_P(CordTest, PrependLargeBuffer) {
+  absl::Cord cord;
+
+  std::string s1(700, '1');
+  absl::CordBuffer buffer = absl::CordBuffer::CreateWithDefaultLimit(s1.size());
+  memcpy(buffer.data(), s1.data(), s1.size());
+  buffer.SetLength(s1.size());
+  cord.Prepend(std::move(buffer));
+  EXPECT_EQ(buffer.length(), 0);    // NOLINT
+  EXPECT_GT(buffer.capacity(), 0);  // NOLINT
+
+  std::string s2(1000, '2');
+  buffer = absl::CordBuffer::CreateWithDefaultLimit(s2.size());
+  memcpy(buffer.data(), s2.data(), s2.size());
+  buffer.SetLength(s2.size());
+  cord.Prepend(std::move(buffer));
+  EXPECT_EQ(buffer.length(), 0);    // NOLINT
+  EXPECT_GT(buffer.capacity(), 0);  // NOLINT
+
+  EXPECT_THAT(cord.Chunks(), ::testing::ElementsAre(s2, s1));
+}
+
+TEST_P(CordTest, GetAppendBufferOnEmptyCord) {
+  absl::Cord cord;
+  absl::CordBuffer buffer = cord.GetAppendBuffer(1000);
+  EXPECT_GE(buffer.capacity(), 1000);
+  EXPECT_EQ(buffer.length(), 0);
+}
+
+TEST_P(CordTest, GetAppendBufferOnInlinedCord) {
+  static constexpr int kInlinedSize = sizeof(absl::CordBuffer) - 1;
+  for (int size : {6, kInlinedSize - 3, kInlinedSize - 2, 1000}) {
+    absl::Cord cord("Abc");
+    absl::CordBuffer buffer = cord.GetAppendBuffer(size, 1);
+    EXPECT_GE(buffer.capacity(), 3 + size);
+    EXPECT_EQ(buffer.length(), 3);
+    EXPECT_EQ(absl::string_view(buffer.data(), buffer.length()), "Abc");
+    EXPECT_TRUE(cord.empty());
+  }
+}
+
+TEST_P(CordTest, GetAppendBufferOnInlinedCordWithCapacityCloseToMax) {
+  // Cover the use case where we have a non empty inlined cord with some size
+  // 'n', and ask for something like 'uint64_max - k', assuming internal logic
+  // could overflow on 'uint64_max - k + size', and return a valid, but
+  // inefficiently smaller buffer if it would provide is the max allowed size.
+  for (size_t dist_from_max = 0; dist_from_max <= 4; ++dist_from_max) {
+    absl::Cord cord("Abc");
+    size_t size = std::numeric_limits<size_t>::max() - dist_from_max;
+    absl::CordBuffer buffer = cord.GetAppendBuffer(size, 1);
+    EXPECT_EQ(buffer.capacity(), absl::CordBuffer::kDefaultLimit);
+    EXPECT_EQ(buffer.length(), 3);
+    EXPECT_EQ(absl::string_view(buffer.data(), buffer.length()), "Abc");
+    EXPECT_TRUE(cord.empty());
+  }
+}
+
+TEST_P(CordTest, GetAppendBufferOnFlat) {
+  // Create a cord with a single flat and extra capacity
+  absl::Cord cord;
+  absl::CordBuffer buffer = absl::CordBuffer::CreateWithDefaultLimit(500);
+  buffer.SetLength(3);
+  memcpy(buffer.data(), "Abc", 3);
+  cord.Append(std::move(buffer));
+
+  buffer = cord.GetAppendBuffer(6);
+  EXPECT_GE(buffer.capacity(), 500);
+  EXPECT_EQ(buffer.length(), 3);
+  EXPECT_EQ(absl::string_view(buffer.data(), buffer.length()), "Abc");
+  EXPECT_TRUE(cord.empty());
+}
+
+TEST_P(CordTest, GetAppendBufferOnFlatWithoutMinCapacity) {
+  // Create a cord with a single flat and extra capacity
+  absl::Cord cord;
+  absl::CordBuffer buffer = absl::CordBuffer::CreateWithDefaultLimit(500);
+  buffer.SetLength(30);
+  memset(buffer.data(), 'x', 30);
+  cord.Append(std::move(buffer));
+
+  buffer = cord.GetAppendBuffer(1000, 900);
+  EXPECT_GE(buffer.capacity(), 1000);
+  EXPECT_EQ(buffer.length(), 0);
+  EXPECT_EQ(cord, std::string(30, 'x'));
+}
+
+TEST_P(CordTest, GetAppendBufferOnTree) {
+  RandomEngine rng;
+  for (int num_flats : {2, 3, 100}) {
+    // Create a cord with `num_flats` flats and extra capacity
+    absl::Cord cord;
+    std::string prefix;
+    std::string last;
+    for (int i = 0; i < num_flats - 1; ++i) {
+      prefix += last;
+      last = RandomLowercaseString(&rng, 10);
+      absl::CordBuffer buffer = absl::CordBuffer::CreateWithDefaultLimit(500);
+      buffer.SetLength(10);
+      memcpy(buffer.data(), last.data(), 10);
+      cord.Append(std::move(buffer));
+    }
+    absl::CordBuffer buffer = cord.GetAppendBuffer(6);
+    EXPECT_GE(buffer.capacity(), 500);
+    EXPECT_EQ(buffer.length(), 10);
+    EXPECT_EQ(absl::string_view(buffer.data(), buffer.length()), last);
+    EXPECT_EQ(cord, prefix);
+  }
+}
+
+TEST_P(CordTest, GetAppendBufferOnTreeWithoutMinCapacity) {
+  absl::Cord cord;
+  for (int i = 0; i < 2; ++i) {
+    absl::CordBuffer buffer = absl::CordBuffer::CreateWithDefaultLimit(500);
+    buffer.SetLength(3);
+    memcpy(buffer.data(), i ? "def" : "Abc", 3);
+    cord.Append(std::move(buffer));
+  }
+  absl::CordBuffer buffer = cord.GetAppendBuffer(1000, 900);
+  EXPECT_GE(buffer.capacity(), 1000);
+  EXPECT_EQ(buffer.length(), 0);
+  EXPECT_EQ(cord, "Abcdef");
+}
+
+TEST_P(CordTest, GetAppendBufferOnSubstring) {
+  // Create a large cord with a single flat and some extra capacity
+  absl::Cord cord;
+  absl::CordBuffer buffer = absl::CordBuffer::CreateWithDefaultLimit(500);
+  buffer.SetLength(450);
+  memset(buffer.data(), 'x', 450);
+  cord.Append(std::move(buffer));
+  cord.RemovePrefix(1);
+
+  // Deny on substring
+  buffer = cord.GetAppendBuffer(6);
+  EXPECT_EQ(buffer.length(), 0);
+  EXPECT_EQ(cord, std::string(449, 'x'));
+}
+
+TEST_P(CordTest, GetAppendBufferOnSharedCord) {
+  // Create a shared cord with a single flat and extra capacity
+  absl::Cord cord;
+  absl::CordBuffer buffer = absl::CordBuffer::CreateWithDefaultLimit(500);
+  buffer.SetLength(3);
+  memcpy(buffer.data(), "Abc", 3);
+  cord.Append(std::move(buffer));
+  absl::Cord shared_cord = cord;
+
+  // Deny on flat
+  buffer = cord.GetAppendBuffer(6);
+  EXPECT_EQ(buffer.length(), 0);
+  EXPECT_EQ(cord, "Abc");
+
+  buffer = absl::CordBuffer::CreateWithDefaultLimit(500);
+  buffer.SetLength(3);
+  memcpy(buffer.data(), "def", 3);
+  cord.Append(std::move(buffer));
+  shared_cord = cord;
+
+  // Deny on tree
+  buffer = cord.GetAppendBuffer(6);
+  EXPECT_EQ(buffer.length(), 0);
+  EXPECT_EQ(cord, "Abcdef");
+}
+
 TEST_P(CordTest, TryFlatEmpty) {
   absl::Cord c;
   EXPECT_EQ(c.TryFlat(), "");