Add Proto Arena TypedBlock

PiperOrigin-RevId: 495003404
diff --git a/src/file_lists.cmake b/src/file_lists.cmake
index 1e326bc..b7367ef 100644
--- a/src/file_lists.cmake
+++ b/src/file_lists.cmake
@@ -103,7 +103,6 @@
   ${protobuf_SOURCE_DIR}/src/google/protobuf/arena_allocation_policy.h
   ${protobuf_SOURCE_DIR}/src/google/protobuf/arena_cleanup.h
   ${protobuf_SOURCE_DIR}/src/google/protobuf/arena_config.h
-  ${protobuf_SOURCE_DIR}/src/google/protobuf/arena_impl.h
   ${protobuf_SOURCE_DIR}/src/google/protobuf/arenastring.h
   ${protobuf_SOURCE_DIR}/src/google/protobuf/arenaz_sampler.h
   ${protobuf_SOURCE_DIR}/src/google/protobuf/compiler/importer.h
@@ -170,6 +169,7 @@
   ${protobuf_SOURCE_DIR}/src/google/protobuf/repeated_field.h
   ${protobuf_SOURCE_DIR}/src/google/protobuf/repeated_ptr_field.h
   ${protobuf_SOURCE_DIR}/src/google/protobuf/service.h
+  ${protobuf_SOURCE_DIR}/src/google/protobuf/serial_arena.h
   ${protobuf_SOURCE_DIR}/src/google/protobuf/stubs/callback.h
   ${protobuf_SOURCE_DIR}/src/google/protobuf/stubs/common.h
   ${protobuf_SOURCE_DIR}/src/google/protobuf/stubs/logging.h
@@ -178,6 +178,7 @@
   ${protobuf_SOURCE_DIR}/src/google/protobuf/stubs/port.h
   ${protobuf_SOURCE_DIR}/src/google/protobuf/stubs/status_macros.h
   ${protobuf_SOURCE_DIR}/src/google/protobuf/text_format.h
+  ${protobuf_SOURCE_DIR}/src/google/protobuf/thread_safe_arena.h
   ${protobuf_SOURCE_DIR}/src/google/protobuf/unknown_field_set.h
   ${protobuf_SOURCE_DIR}/src/google/protobuf/util/delimited_message_util.h
   ${protobuf_SOURCE_DIR}/src/google/protobuf/util/field_comparator.h
@@ -227,7 +228,6 @@
   ${protobuf_SOURCE_DIR}/src/google/protobuf/arena_allocation_policy.h
   ${protobuf_SOURCE_DIR}/src/google/protobuf/arena_cleanup.h
   ${protobuf_SOURCE_DIR}/src/google/protobuf/arena_config.h
-  ${protobuf_SOURCE_DIR}/src/google/protobuf/arena_impl.h
   ${protobuf_SOURCE_DIR}/src/google/protobuf/arenastring.h
   ${protobuf_SOURCE_DIR}/src/google/protobuf/arenaz_sampler.h
   ${protobuf_SOURCE_DIR}/src/google/protobuf/endian.h
@@ -258,6 +258,7 @@
   ${protobuf_SOURCE_DIR}/src/google/protobuf/port_undef.inc
   ${protobuf_SOURCE_DIR}/src/google/protobuf/repeated_field.h
   ${protobuf_SOURCE_DIR}/src/google/protobuf/repeated_ptr_field.h
+  ${protobuf_SOURCE_DIR}/src/google/protobuf/serial_arena.h
   ${protobuf_SOURCE_DIR}/src/google/protobuf/stubs/callback.h
   ${protobuf_SOURCE_DIR}/src/google/protobuf/stubs/common.h
   ${protobuf_SOURCE_DIR}/src/google/protobuf/stubs/logging.h
@@ -265,6 +266,7 @@
   ${protobuf_SOURCE_DIR}/src/google/protobuf/stubs/platform_macros.h
   ${protobuf_SOURCE_DIR}/src/google/protobuf/stubs/port.h
   ${protobuf_SOURCE_DIR}/src/google/protobuf/stubs/status_macros.h
+  ${protobuf_SOURCE_DIR}/src/google/protobuf/thread_safe_arena.h
   ${protobuf_SOURCE_DIR}/src/google/protobuf/wire_format_lite.h
 )
 
diff --git a/src/google/protobuf/BUILD.bazel b/src/google/protobuf/BUILD.bazel
index 4902ce2..9581b91 100644
--- a/src/google/protobuf/BUILD.bazel
+++ b/src/google/protobuf/BUILD.bazel
@@ -244,8 +244,9 @@
     hdrs = [
         "arena.h",
         "arena_config.h",
-        "arena_impl.h",
         "arenaz_sampler.h",
+        "serial_arena.h",
+        "thread_safe_arena.h",
     ],
     include_prefix = "google/protobuf",
     visibility = [
@@ -284,7 +285,6 @@
     hdrs = [
         "any.h",
         "arena.h",
-        "arena_impl.h",
         "arenastring.h",
         "arenaz_sampler.h",
         "endian.h",
@@ -308,6 +308,8 @@
         "port.h",
         "repeated_field.h",
         "repeated_ptr_field.h",
+        "serial_arena.h",
+        "thread_safe_arena.h",
         "wire_format_lite.h",
     ],
     copts = COPTS + select({
@@ -326,6 +328,7 @@
     # In Bazel 6.0+, these will be `interface_deps`:
     deps = [
         ":arena",
+        ":arena_align",
         ":arena_config",
         "//src/google/protobuf/io",
         "//src/google/protobuf/stubs:lite",
diff --git a/src/google/protobuf/arena.cc b/src/google/protobuf/arena.cc
index 2a62414..b208882 100644
--- a/src/google/protobuf/arena.cc
+++ b/src/google/protobuf/arena.cc
@@ -35,14 +35,19 @@
 #include <cstddef>
 #include <cstdint>
 #include <limits>
+#include <string>
 #include <typeinfo>
 
 #include "absl/base/attributes.h"
+#include "google/protobuf/stubs/logging.h"
+#include "google/protobuf/stubs/logging.h"
 #include "absl/synchronization/mutex.h"
+#include "google/protobuf/arena_align.h"
 #include "google/protobuf/arena_allocation_policy.h"
-#include "google/protobuf/arena_impl.h"
 #include "google/protobuf/arenaz_sampler.h"
 #include "google/protobuf/port.h"
+#include "google/protobuf/serial_arena.h"
+#include "google/protobuf/thread_safe_arena.h"
 
 
 #ifdef ADDRESS_SANITIZER
@@ -161,6 +166,7 @@
   set_ptr(b->Pointer(offset));
   limit_ = b->Limit();
   head_.store(b, std::memory_order_relaxed);
+  strings_ = StringBlock::sentinel();
   space_used_.store(0, std::memory_order_relaxed);
   space_allocated_.store(b->size, std::memory_order_relaxed);
   cached_block_length_ = 0;
@@ -195,21 +201,45 @@
   return AllocateFromExisting(n);
 }
 
-PROTOBUF_NOINLINE
-void* SerialArena::AllocateAlignedWithCleanupFallback(
-    size_t n, size_t align, void (*destructor)(void*)) {
-  size_t required = AlignUpTo(n, align) + cleanup::Size(destructor);
-  AllocateNewBlock(required);
-  return AllocateFromExistingWithCleanupFallback(n, align, destructor);
+void* SerialArena::AllocateEmbeddedFallback(size_t size, cleanup::Tag tag) {
+  AllocateNewBlock(size);
+  return BlindlyAllocateEmbedded(size, tag);
 }
 
-PROTOBUF_NOINLINE
-void SerialArena::AddCleanupFallback(void* elem, void (*destructor)(void*)) {
-  size_t required = cleanup::Size(destructor);
-  AllocateNewBlock(required);
-  AddCleanupFromExisting(elem, destructor);
+// https://godbolt.org/z/EcGPxbMdW
+std::string* SerialArena::AllocateStringFallback() {
+  constexpr size_t kMin = 10;
+  constexpr size_t kMax = 170;
+  size_t n = std::min(std::max(strings_->capacity() * 2, kMin), kMax);
+  strings_ = StringBlock::Create(n, strings_);
+  return strings_->Allocate();
 }
 
+template <typename Align, typename TagOrDtor>
+void* SerialArena::AllocateWithCleanupFallback(size_t size, Align align,
+                                               TagOrDtor cleanup) {
+  AllocateNewBlock(align.Padded(size) + cleanup::CleanupSize(cleanup));
+  return BlindlyAllocateWithCleanup(size, align, cleanup);
+}
+
+template void* SerialArena::AllocateWithCleanupFallback(size_t,
+                                                        ArenaAlignDefault,
+                                                        cleanup::Tag);
+template void* SerialArena::AllocateWithCleanupFallback(size_t,
+                                                        ArenaAlignDefault,
+                                                        void (*)(void*));
+template void* SerialArena::AllocateWithCleanupFallback(size_t, ArenaAlign,
+                                                        void (*)(void*));
+
+template <typename TagOrCleanup>
+void SerialArena::AddCleanupFallback(void* elem, TagOrCleanup cleanup) {
+  AllocateNewBlock(cleanup::CleanupSize(cleanup));
+  BlindlyAddCleanup(elem, cleanup);
+}
+
+template void SerialArena::AddCleanupFallback(void*, cleanup::Tag);
+template void SerialArena::AddCleanupFallback(void*, void (*)(void*));
+
 void SerialArena::AllocateNewBlock(size_t n) {
   size_t used = 0;
   size_t wasted = 0;
@@ -271,6 +301,13 @@
 }
 
 void SerialArena::CleanupList() {
+  StringBlock* block = strings_;
+  while (StringBlock* next = block->next()) {
+    block->DestroyAll();
+    StringBlock::Delete(block);
+    block = next;
+  }
+
   ArenaBlock* b = head();
   if (b->IsSentry()) return;
 
@@ -694,29 +731,31 @@
   return space_allocated;
 }
 
-void* ThreadSafeArena::AllocateAlignedWithCleanup(size_t n, size_t align,
-                                                  void (*destructor)(void*)) {
-  SerialArena* arena;
-  if (PROTOBUF_PREDICT_TRUE(GetSerialArenaFast(&arena))) {
-    return arena->AllocateAlignedWithCleanup(n, align, destructor);
-  } else {
-    return AllocateAlignedWithCleanupFallback(n, align, destructor);
-  }
+template <typename Align, typename TagOrDtor>
+void* ThreadSafeArena::AllocateWithCleanupFallback(size_t size, Align align,
+                                                   TagOrDtor cleanup) {
+  const size_t n = align.Padded(size) + cleanup::CleanupSize(cleanup);
+  SerialArena* arena = GetSerialArenaFallback(n);
+  return arena->AllocateWithCleanup(size, align, cleanup);
 }
 
-void ThreadSafeArena::AddCleanup(void* elem, void (*cleanup)(void*)) {
-  SerialArena* arena;
-  if (PROTOBUF_PREDICT_FALSE(!GetSerialArenaFast(&arena))) {
-    arena = GetSerialArenaFallback(kMaxCleanupNodeSize);
-  }
-  arena->AddCleanup(elem, cleanup);
+template void* ThreadSafeArena::AllocateWithCleanupFallback(size_t,
+                                                            ArenaAlignDefault,
+                                                            cleanup::Tag);
+template void* ThreadSafeArena::AllocateWithCleanupFallback(size_t,
+                                                            ArenaAlignDefault,
+                                                            void (*)(void*));
+template void* ThreadSafeArena::AllocateWithCleanupFallback(size_t, ArenaAlign,
+                                                            void (*)(void*));
+
+void* ThreadSafeArena::AllocateEmbeddedFallback(size_t size, cleanup::Tag tag) {
+  SerialArena* arena = GetSerialArenaFallback(size);
+  return arena->AllocateEmbedded(size, tag);
 }
 
-PROTOBUF_NOINLINE
-void* ThreadSafeArena::AllocateAlignedWithCleanupFallback(
-    size_t n, size_t align, void (*destructor)(void*)) {
-  return GetSerialArenaFallback(n + kMaxCleanupNodeSize)
-      ->AllocateAlignedWithCleanup(n, align, destructor);
+std::string* ThreadSafeArena::AllocateStringFallback() {
+  SerialArena* arena = GetSerialArenaFallback(0);
+  return arena->AllocateString();
 }
 
 template <typename Functor>
@@ -845,11 +884,32 @@
   return impl_.AllocateAligned<internal::AllocationClient::kArray>(n);
 }
 
-void* Arena::AllocateAlignedWithCleanup(size_t n, size_t align,
-                                        void (*destructor)(void*)) {
-  return impl_.AllocateAlignedWithCleanup(n, align, destructor);
+template <typename TagOrCleanup>
+void Arena::AddCleanup(void* object, TagOrCleanup cleanup) {
+  impl_.AddCleanup(object, cleanup);
 }
 
+template void Arena::AddCleanup(void*, internal::cleanup::Tag);
+template void Arena::AddCleanup(void*, void (*)(void*));
+
+void* Arena::AllocateEmbedded(size_t size, internal::cleanup::Tag tag) {
+  return impl_.AllocateEmbedded(size, tag);
+}
+
+std::string* Arena::AllocateString() { return impl_.AllocateString(); }
+
+template <typename Align, typename TagOrDtor>
+void* Arena::AllocateWithCleanup(size_t size, Align align, TagOrDtor cleanup) {
+  return impl_.AllocateWithCleanup(size, align, cleanup);
+}
+
+template void* Arena::AllocateWithCleanup(size_t, internal::ArenaAlignDefault,
+                                          internal::cleanup::Tag);
+template void* Arena::AllocateWithCleanup(size_t, internal::ArenaAlignDefault,
+                                          void (*)(void*));
+template void* Arena::AllocateWithCleanup(size_t, internal::ArenaAlign,
+                                          void (*)(void*));
+
 }  // namespace protobuf
 }  // namespace google
 
diff --git a/src/google/protobuf/arena.h b/src/google/protobuf/arena.h
index 8a7d1b4..708f918 100644
--- a/src/google/protobuf/arena.h
+++ b/src/google/protobuf/arena.h
@@ -51,8 +51,9 @@
 #include <type_traits>
 #include "google/protobuf/arena_align.h"
 #include "google/protobuf/arena_config.h"
-#include "google/protobuf/arena_impl.h"
 #include "google/protobuf/port.h"
+#include "google/protobuf/serial_arena.h"
+#include "google/protobuf/thread_safe_arena.h"
 
 // Must be included last.
 #include "google/protobuf/port_def.inc"
@@ -274,11 +275,10 @@
     if (PROTOBUF_PREDICT_FALSE(arena == nullptr)) {
       return new T(std::forward<Args>(args)...);
     }
-    auto destructor =
-        internal::ObjectDestructor<std::is_trivially_destructible<T>::value,
-                                   T>::destructor;
-    return new (arena->AllocateInternal(sizeof(T), alignof(T), destructor))
-        T(std::forward<Args>(args)...);
+    void* mem = std::is_trivially_destructible<T>::value
+                    ? arena->AllocateAligned(sizeof(T), alignof(T))
+                    : arena->AllocateWithCleanup<T>();
+    return new (mem) T(std::forward<Args>(args)...);
   }
 
   // API to delete any objects not on an arena.  This can be used to safely
@@ -375,9 +375,7 @@
   // arena-allocated memory.
   template <typename T>
   PROTOBUF_ALWAYS_INLINE void OwnDestructor(T* object) {
-    if (object != nullptr) {
-      impl_.AddCleanup(object, &internal::cleanup::arena_destruct_object<T>);
-    }
+    if (object != nullptr) AddCleanup(object);
   }
 
   // Adds a custom member function on an object to the list of destructors that
@@ -565,15 +563,8 @@
     }
   }
 
-  PROTOBUF_NDEBUG_INLINE void* AllocateInternal(size_t size, size_t align,
-                                                void (*destructor)(void*)) {
-    // Monitor allocation if needed.
-    if (destructor == nullptr) {
-      return AllocateAligned(size, align);
-    } else {
-      return AllocateAlignedWithCleanup(size, align, destructor);
-    }
-  }
+  template <typename T, bool enable_tags = internal::cleanup::EnableTags()>
+  void* AllocateWithCleanup();
 
   // CreateMessage<T> requires that T supports arenas, but this private method
   // works whether or not T supports arenas. These are not exposed to user code
@@ -603,12 +594,10 @@
 
   template <typename T, typename... Args>
   PROTOBUF_NDEBUG_INLINE T* DoCreateMessage(Args&&... args) {
-    return InternalHelper<T>::Construct(
-        AllocateInternal(sizeof(T), alignof(T),
-                         internal::ObjectDestructor<
-                             InternalHelper<T>::is_destructor_skippable::value,
-                             T>::destructor),
-        this, std::forward<Args>(args)...);
+    void* mem = InternalHelper<T>::is_destructor_skippable::value
+                    ? AllocateAligned(sizeof(T), alignof(T))
+                    : AllocateWithCleanup<T>();
+    return InternalHelper<T>::Construct(mem, this, std::forward<Args>(args)...);
   }
 
   // CreateInArenaStorage is used to implement map field. Without it,
@@ -670,8 +659,17 @@
 
   void* Allocate(size_t n);
   void* AllocateForArray(size_t n);
-  void* AllocateAlignedWithCleanup(size_t n, size_t align,
-                                   void (*destructor)(void*));
+
+  template <typename T, bool enable_tags = internal::cleanup::EnableTags()>
+  void AddCleanup(T* object);
+
+  template <typename TagOrCleanup>
+  void AddCleanup(void* object, TagOrCleanup cleanup);
+
+  template <typename Align, typename TagOrDtor>
+  void* AllocateWithCleanup(size_t size, Align align, TagOrDtor cleanup);
+  void* AllocateEmbedded(size_t size, internal::cleanup::Tag tag);
+  std::string* AllocateString();
 
   template <typename Type>
   friend class internal::GenericTypeHandler;
@@ -688,6 +686,50 @@
   friend struct internal::ArenaTestPeer;
 };
 
+// Default implementation returns `AddCleanup(T*object, dtor<T>)`
+template <typename T, bool enable_tags>
+inline PROTOBUF_NDEBUG_INLINE void Arena::AddCleanup(T* object) {
+  AddCleanup(object, &internal::cleanup::arena_destruct_object<T>);
+}
+
+#ifdef PROTO_USE_TAGGED_CLEANUPS
+
+// Specialization for `AddCleanup<std::string>()`
+template <>
+inline PROTOBUF_NDEBUG_INLINE void Arena::AddCleanup<std::string, true>(
+    std::string* object) {
+  AddCleanup(object, internal::cleanup::Tag::kString);
+}
+
+// Specialization for `AddCleanup<absl::Cord>()`
+template <>
+inline PROTOBUF_NDEBUG_INLINE void Arena::AddCleanup<absl::Cord, true>(
+    absl::Cord* object) {
+  AddCleanup(object, internal::cleanup::Tag::kCord);
+}
+#endif  // PROTO_USE_TAGGED_CLEANUPS
+
+template <typename T, bool enable_tags>
+inline PROTOBUF_NDEBUG_INLINE void* Arena::AllocateWithCleanup() {
+  constexpr auto align = internal::ArenaAlignOf<T>();
+  return AllocateWithCleanup(align.Ceil(sizeof(T)), align,
+                             &internal::cleanup::arena_destruct_object<T>);
+}
+
+template <>
+inline PROTOBUF_NDEBUG_INLINE void*
+Arena::AllocateWithCleanup<std::string, true>() {
+  return AllocateString();
+}
+
+template <>
+inline PROTOBUF_NDEBUG_INLINE void*
+Arena::AllocateWithCleanup<absl::Cord, true>() {
+  return AllocateWithCleanup(internal::cleanup::AllocationSize<absl::Cord>(),
+                             internal::ArenaAlignDefault(),
+                             internal::cleanup::Tag::kCord);
+}
+
 }  // namespace protobuf
 }  // namespace google
 
diff --git a/src/google/protobuf/arena_cleanup.h b/src/google/protobuf/arena_cleanup.h
index d48661f..00c09f6 100644
--- a/src/google/protobuf/arena_cleanup.h
+++ b/src/google/protobuf/arena_cleanup.h
@@ -31,14 +31,16 @@
 #ifndef GOOGLE_PROTOBUF_ARENA_CLEANUP_H__
 #define GOOGLE_PROTOBUF_ARENA_CLEANUP_H__
 
+#include <algorithm>
 #include <cstddef>
 #include <cstdint>
 #include <string>
+#include <type_traits>
 
 #include "absl/base/attributes.h"
 #include "google/protobuf/stubs/logging.h"
-#include "google/protobuf/stubs/logging.h"
 #include "absl/strings/cord.h"
+#include "google/protobuf/arena_align.h"
 
 
 // Must be included last.
@@ -59,9 +61,10 @@
 // lowest 2 bits of the `elem` value identifying the type of node. All node
 // types must start with a `uintptr_t` that stores `Tag` in its low two bits.
 enum class Tag : uintptr_t {
-  kDynamic = 0,  // DynamicNode
-  kString = 1,   // TaggedNode (std::string)
-  kCord = 2,     // TaggedNode (absl::Cord)
+  kDynamic = 0,         // DynamicNode
+  kString = 1,          // TaggedNode (std::string)
+  kCord = 2,            // TaggedNode (absl::Cord)
+  kEmbeddedString = 3,  // TaggedNode (std::string)
 };
 
 // DynamicNode contains the object (`elem`) that needs to be
@@ -72,57 +75,112 @@
   void (*destructor)(void*);
 };
 
+// std::max is not constexpr in c11
+template <typename T>
+inline constexpr T Max(T lhs, T rhs) {
+  return lhs < rhs ? rhs : lhs;
+}
+
 // TaggedNode contains a `std::string` or `absl::Cord` object (`elem`) that
 // needs to be destroyed. The lowest 2 bits of `elem` contain the non-zero
-// `kString` or `kCord` tag.
-struct TaggedNode {
+// `kString` or `kCord` tag. TaggedNode must have an alignment matching at
+// least the alignment of std::string and absl::Cord.
+struct alignas(Max(alignof(std::string), alignof(absl::Cord))) TaggedNode {
   uintptr_t elem;
 };
 
-// EnableSpecializedTags() return true if the alignment of tagged objects
+// EnableTags() return true if the alignment of tagged objects
 // such as std::string allow us to poke tags in the 2 LSB bits.
-inline constexpr bool EnableSpecializedTags() {
+inline constexpr bool EnableTags() {
   // For now we require 2 bits
   return alignof(std::string) >= 8 && alignof(absl::Cord) >= 8;
 }
 
-// Adds a cleanup entry identified by `tag` at memory location `pos`.
-inline ABSL_ATTRIBUTE_ALWAYS_INLINE void CreateNode(Tag tag, void* pos,
-                                                    const void* elem_raw,
-                                                    void (*destructor)(void*)) {
-  auto elem = reinterpret_cast<uintptr_t>(elem_raw);
-  if (EnableSpecializedTags()) {
-    GOOGLE_ABSL_DCHECK_EQ(elem & 3, 0ULL);  // Must be aligned
-    switch (tag) {
-      case Tag::kString: {
-        TaggedNode n = {elem | static_cast<uintptr_t>(Tag::kString)};
-        memcpy(pos, &n, sizeof(n));
-        return;
-      }
-      case Tag::kCord: {
-        TaggedNode n = {elem | static_cast<uintptr_t>(Tag::kCord)};
-        memcpy(pos, &n, sizeof(n));
-        return;
-      }
-      default:
-        break;
-    }
-  }
-  DynamicNode n = {elem, destructor};
+// Adds a cleanup entry invoking 'cleanup' on `object` at memory location `pos`.
+inline ABSL_ATTRIBUTE_ALWAYS_INLINE void CreateNode(void* pos,
+                                                    const void* object,
+                                                    void (*cleanup)(void*)) {
+  auto elem = reinterpret_cast<uintptr_t>(object);
+  DynamicNode n = {elem, cleanup};
   memcpy(pos, &n, sizeof(n));
 }
 
+// Adds a cleanup entry invoking the destructor of an object embedded in pos.
+inline ABSL_ATTRIBUTE_ALWAYS_INLINE void CreateNode(void* pos, Tag tag) {
+  GOOGLE_ABSL_DCHECK(tag == Tag::kEmbeddedString);
+  TaggedNode n = {static_cast<uintptr_t>(tag)};
+  memcpy(pos, &n, sizeof(n));
+}
+
+// Adds a cleanup entry invoking the destructor of `object`, who's type
+// is identified by `tag` at memory location `pos`.
+inline ABSL_ATTRIBUTE_ALWAYS_INLINE void CreateNode(void* pos,
+                                                    const void* object,
+                                                    Tag tag) {
+  GOOGLE_ABSL_DCHECK(tag == Tag::kString || tag == Tag::kCord);
+  GOOGLE_ABSL_DCHECK_NE(object, nullptr);
+  auto elem = reinterpret_cast<uintptr_t>(object);
+  GOOGLE_ABSL_DCHECK_EQ(elem & 3U, 0u);
+  TaggedNode n = {elem + static_cast<uintptr_t>(tag)};
+  memcpy(pos, &n, sizeof(n));
+}
+
+inline ABSL_ATTRIBUTE_ALWAYS_INLINE void Prefetch(const void* address) {
+}
+
+template <typename T>
+inline constexpr size_t NodeSize() {
+  return (std::is_same<T, std::string>::value ||
+          std::is_same<T, absl::Cord>::value)
+             ? sizeof(TaggedNode)
+             : sizeof(DynamicNode);
+}
+
+inline size_t CleanupSize(void (*)(void*)) {
+  return ArenaAlignDefault::Ceil(sizeof(DynamicNode));
+}
+inline size_t CleanupSize(Tag) {
+  return ArenaAlignDefault::Ceil(sizeof(TaggedNode));
+}
+
+template <typename T>
+inline constexpr size_t AllocationSize() {
+  return ArenaAlignDefault::Ceil(NodeSize<T>() + sizeof(T));
+}
+
+template <typename Node, typename T>
+T* NodePointerToObjectPointer(void* pos) {
+  return reinterpret_cast<T*>(reinterpret_cast<Node*>(pos) + 1);
+}
+
 // Optimization: performs a prefetch on `elem_address`.
 // Returns the size of the cleanup (meta) data at this address, allowing the
 // caller to advance cleanup iterators without needing to examine or know
 // anything about the underlying cleanup node or cleanup meta data / tags.
 inline ABSL_ATTRIBUTE_ALWAYS_INLINE size_t
 PrefetchNode(const void* elem_address) {
-  if (EnableSpecializedTags()) {
+  if (EnableTags()) {
     uintptr_t elem;
     memcpy(&elem, elem_address, sizeof(elem));
-    if (static_cast<Tag>(elem & 3) != Tag::kDynamic) {
-      return sizeof(TaggedNode);
+    uintptr_t tag = elem & 3U;
+    elem -= tag;
+    switch (static_cast<Tag>(tag)) {
+      case Tag::kEmbeddedString:
+        GOOGLE_ABSL_DCHECK_EQ(elem, 0U);
+        return AllocationSize<std::string>();
+
+      case Tag::kString:
+        GOOGLE_ABSL_DCHECK_NE(elem, 0U);
+        Prefetch(reinterpret_cast<const void*>(elem));
+        return ArenaAlignDefault::Ceil(sizeof(TaggedNode));
+
+      case Tag::kCord:
+        GOOGLE_ABSL_DCHECK_NE(elem, 0U);
+        Prefetch(reinterpret_cast<const void*>(elem));
+        return ArenaAlignDefault::Ceil(sizeof(TaggedNode));
+
+      case Tag::kDynamic:
+        break;
     }
   }
   return sizeof(DynamicNode);
@@ -132,23 +190,34 @@
 // Returns the size of the cleanup (meta) data at this address, allowing the
 // caller to advance cleanup iterators without needing to examine or know
 // anything about the underlying cleanup node or cleanup meta data / tags.
-inline ABSL_ATTRIBUTE_ALWAYS_INLINE size_t DestroyNode(const void* pos) {
+inline ABSL_ATTRIBUTE_ALWAYS_INLINE size_t DestroyNode(void* pos) {
   uintptr_t elem;
   memcpy(&elem, pos, sizeof(elem));
-  if (EnableSpecializedTags()) {
-    switch (static_cast<Tag>(elem & 3)) {
-      case Tag::kString: {
-        // Some compilers don't like fully qualified explicit dtor calls,
-        // so use an alias to avoid having to type `::`.
+  if (EnableTags()) {
+    uintptr_t tag = elem & 3U;
+    elem -= tag;
+    switch (static_cast<Tag>(tag)) {
+      case Tag::kEmbeddedString: {
         using T = std::string;
-        reinterpret_cast<T*>(elem - static_cast<uintptr_t>(Tag::kString))->~T();
-        return sizeof(TaggedNode);
+        GOOGLE_ABSL_DCHECK_EQ(elem, 0U);
+        NodePointerToObjectPointer<TaggedNode, T>(pos)->~T();
+        return AllocationSize<std::string>();
       }
+
+      case Tag::kString: {
+        using T = std::string;
+        GOOGLE_ABSL_DCHECK_NE(elem, 0U);
+        reinterpret_cast<T*>(elem)->~T();
+        return ArenaAlignDefault::Ceil(sizeof(TaggedNode));
+      }
+
       case Tag::kCord: {
         using T = absl::Cord;
-        reinterpret_cast<T*>(elem - static_cast<uintptr_t>(Tag::kCord))->~T();
-        return sizeof(TaggedNode);
+        GOOGLE_ABSL_DCHECK_NE(elem, 0U);
+        reinterpret_cast<T*>(elem)->~T();
+        return ArenaAlignDefault::Ceil(sizeof(TaggedNode));
       }
+
       default:
         break;
     }
@@ -158,62 +227,6 @@
   return sizeof(DynamicNode);
 }
 
-// Returns the `tag` identifying the type of object for `destructor` or
-// kDynamic if `destructor` does not identify a well know object type.
-inline ABSL_ATTRIBUTE_ALWAYS_INLINE Tag Type(void (*destructor)(void*)) {
-  if (EnableSpecializedTags()) {
-    if (destructor == &arena_destruct_object<std::string>) {
-      return Tag::kString;
-    }
-    if (destructor == &arena_destruct_object<absl::Cord>) {
-      return Tag::kCord;
-    }
-  }
-  return Tag::kDynamic;
-}
-
-// Returns the `tag` identifying the type of object stored at memory location
-// `elem`, which represents the first uintptr_t value in the node.
-inline ABSL_ATTRIBUTE_ALWAYS_INLINE Tag Type(void* raw) {
-  if (!EnableSpecializedTags()) return Tag::kDynamic;
-
-  uintptr_t elem;
-  memcpy(&elem, raw, sizeof(elem));
-  switch (static_cast<Tag>(elem & 0x7ULL)) {
-    case Tag::kDynamic:
-      return Tag::kDynamic;
-    case Tag::kString:
-      return Tag::kString;
-    case Tag::kCord:
-      return Tag::kCord;
-    default:
-      GOOGLE_ABSL_LOG(FATAL) << "Corrupted cleanup tag: " << (elem & 0x7ULL);
-      return Tag::kDynamic;
-  }
-}
-
-// Returns the required size in bytes off the node type identified by `tag`.
-inline ABSL_ATTRIBUTE_ALWAYS_INLINE size_t Size(Tag tag) {
-  if (!EnableSpecializedTags()) return sizeof(DynamicNode);
-
-  switch (tag) {
-    case Tag::kDynamic:
-      return sizeof(DynamicNode);
-    case Tag::kString:
-      return sizeof(TaggedNode);
-    case Tag::kCord:
-      return sizeof(TaggedNode);
-    default:
-      GOOGLE_ABSL_LOG(FATAL) << "Corrupted cleanup tag: " << static_cast<int>(tag);
-      return sizeof(DynamicNode);
-  }
-}
-
-// Returns the required size in bytes off the node type for `destructor`.
-inline ABSL_ATTRIBUTE_ALWAYS_INLINE size_t Size(void (*destructor)(void*)) {
-  return destructor == nullptr ? 0 : Size(Type(destructor));
-}
-
 }  // namespace cleanup
 }  // namespace internal
 }  // namespace protobuf
diff --git a/src/google/protobuf/serial_arena.h b/src/google/protobuf/serial_arena.h
new file mode 100644
index 0000000..90c85b3
--- /dev/null
+++ b/src/google/protobuf/serial_arena.h
@@ -0,0 +1,532 @@
+// Protocol Buffers - Google's data interchange format
+// Copyright 2022 Google Inc.  All rights reserved.
+// https://developers.google.com/protocol-buffers/
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are
+// met:
+//
+//     * Redistributions of source code must retain the above copyright
+// notice, this list of conditions and the following disclaimer.
+//     * Redistributions in binary form must reproduce the above
+// copyright notice, this list of conditions and the following disclaimer
+// in the documentation and/or other materials provided with the
+// distribution.
+//     * Neither the name of Google Inc. nor the names of its
+// contributors may be used to endorse or promote products derived from
+// this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+//
+// This file defines the internal class SerialArena
+
+#ifndef GOOGLE_PROTOBUF_SERIAL_ARENA_H__
+#define GOOGLE_PROTOBUF_SERIAL_ARENA_H__
+
+#include <algorithm>
+#include <atomic>
+#include <memory>
+#include <string>
+#include <type_traits>
+#include <typeinfo>
+#include <utility>
+
+#include "google/protobuf/stubs/logging.h"
+#include "google/protobuf/stubs/common.h"
+#include "google/protobuf/stubs/logging.h"
+#include "absl/numeric/bits.h"
+#include "google/protobuf/arena_align.h"
+#include "google/protobuf/arena_cleanup.h"
+#include "google/protobuf/arena_config.h"
+#include "google/protobuf/arenaz_sampler.h"
+#include "google/protobuf/port.h"
+
+
+// Must be included last.
+#include "google/protobuf/port_def.inc"
+
+namespace google {
+namespace protobuf {
+namespace internal {
+
+template <typename T>
+class TypedBlock {
+ public:
+  static constexpr size_t AllocSize(size_t count) {
+    return sizeof(TypedBlock) + sizeof(T) * count;
+  }
+
+  static TypedBlock* Emplace(void* mem, size_t size, TypedBlock* next) {
+    return new (mem) TypedBlock((size - sizeof(TypedBlock)) / sizeof(T), next);
+  }
+
+  static TypedBlock* Create(size_t count, TypedBlock* next) {
+    size_t sz = AllocSize(count);
+    return Emplace(::operator new(sz), sz, next);
+  }
+
+  static void Delete(TypedBlock* block) {
+    internal::SizedDelete(block, AllocSize(block->capacity_));
+  }
+
+  void DestroyAll() {
+    for (T& t : *this) t.~T();
+  }
+
+  static TypedBlock* sentinel();
+
+  TypedBlock* next() const { return next_; }
+  size_t capacity() const { return capacity_; }
+
+  T* begin() { return reinterpret_cast<T*>(this + 1); }
+  T* end() { return reinterpret_cast<T*>(this + 1) + count_; }
+
+  inline std::string* TryAllocate() {
+    if (ABSL_PREDICT_TRUE(count_ < capacity_)) {
+      return reinterpret_cast<T*>(this + 1) + count_++;
+    }
+    return nullptr;
+  }
+
+  inline std::string* Allocate() {
+    GOOGLE_ABSL_DCHECK_LT(count_, capacity_);
+    return reinterpret_cast<T*>(this + 1) + count_++;
+  }
+
+ private:
+  TypedBlock() = default;
+  ~TypedBlock() = default;
+  TypedBlock(size_t capacity, TypedBlock* next)
+      : next_(next), capacity_(static_cast<uint32_t>(capacity)) {
+    GOOGLE_ABSL_DCHECK_LE(capacity, std::numeric_limits<uint32_t>::max());
+  }
+
+  struct Sentinel;
+
+  struct alignas(T) {
+    TypedBlock* const next_ = nullptr;
+    const uint32_t capacity_ = 0;
+    uint32_t count_ = 0;
+  };
+};
+
+template <typename T>
+struct TypedBlock<T>::Sentinel {
+  static constexpr TypedBlock<T> kSentinel;
+};
+
+template <typename T>
+inline TypedBlock<T>* TypedBlock<T>::sentinel() {
+  return const_cast<TypedBlock<T>*>(&Sentinel::kSentinel);
+}
+
+
+// Arena blocks are variable length malloc-ed objects.  The following structure
+// describes the common header for all blocks.
+struct ArenaBlock {
+  // For the sentry block with zero-size where ptr_, limit_, cleanup_nodes all
+  // point to "this".
+  constexpr ArenaBlock()
+      : next(nullptr), cleanup_nodes(this), size(0) {}
+
+  ArenaBlock(ArenaBlock* next, size_t size)
+      : next(next), cleanup_nodes(nullptr), size(size) {
+    GOOGLE_DCHECK_GT(size, sizeof(ArenaBlock));
+  }
+
+  char* Pointer(size_t n) {
+    GOOGLE_DCHECK_LE(n, size);
+    return reinterpret_cast<char*>(this) + n;
+  }
+  char* Limit() { return Pointer(size & static_cast<size_t>(-8)); }
+
+  bool IsSentry() const { return size == 0; }
+
+  ArenaBlock* const next;
+  void* cleanup_nodes;
+  const size_t size;
+  // data follows
+};
+
+enum class AllocationClient { kDefault, kArray };
+
+class ThreadSafeArena;
+
+// Tag type used to invoke the constructor of the first SerialArena.
+struct FirstSerialArena {
+  explicit FirstSerialArena() = default;
+};
+
+// A simple arena allocator. Calls to allocate functions must be properly
+// serialized by the caller, hence this class cannot be used as a general
+// purpose allocator in a multi-threaded program. It serves as a building block
+// for ThreadSafeArena, which provides a thread-safe arena allocator.
+//
+// This class manages
+// 1) Arena bump allocation + owning memory blocks.
+// 2) Maintaining a cleanup list.
+// It delegates the actual memory allocation back to ThreadSafeArena, which
+// contains the information on block growth policy and backing memory allocation
+// used.
+class PROTOBUF_EXPORT SerialArena {
+ public:
+  struct Memory {
+    void* ptr;
+    size_t size;
+  };
+
+  void CleanupList();
+  uint64_t SpaceAllocated() const {
+    return space_allocated_.load(std::memory_order_relaxed);
+  }
+  uint64_t SpaceUsed() const;
+
+  bool HasSpace(size_t n) const {
+    return n <= static_cast<size_t>(limit_ - ptr());
+  }
+
+  // See comments on `cached_blocks_` member for details.
+  PROTOBUF_ALWAYS_INLINE void* TryAllocateFromCachedBlock(size_t size) {
+    if (PROTOBUF_PREDICT_FALSE(size < 16)) return nullptr;
+    // We round up to the next larger block in case the memory doesn't match
+    // the pattern we are looking for.
+    const size_t index = absl::bit_width(size - 1) - 4;
+
+    if (index >= cached_block_length_) return nullptr;
+    auto& cached_head = cached_blocks_[index];
+    if (cached_head == nullptr) return nullptr;
+
+    void* ret = cached_head;
+    PROTOBUF_UNPOISON_MEMORY_REGION(ret, size);
+    cached_head = cached_head->next;
+    return ret;
+  }
+
+  // In kArray mode we look through cached blocks.
+  // We do not do this by default because most non-array allocations will not
+  // have the right size and will fail to find an appropriate cached block.
+  //
+  // TODO(sbenza): Evaluate if we should use cached blocks for message types of
+  // the right size. We can statically know if the allocation size can benefit
+  // from it.
+  template <AllocationClient alloc_client = AllocationClient::kDefault>
+  void* AllocateAligned(size_t n) {
+    GOOGLE_DCHECK(internal::ArenaAlignDefault::IsAligned(n));
+    GOOGLE_DCHECK_GE(limit_, ptr());
+
+    if (alloc_client == AllocationClient::kArray) {
+      if (void* res = TryAllocateFromCachedBlock(n)) {
+        return res;
+      }
+    }
+
+    if (PROTOBUF_PREDICT_FALSE(!HasSpace(n))) {
+      return AllocateAlignedFallback(n);
+    }
+    return AllocateFromExisting(n);
+  }
+
+  template <typename Align, typename TagOrDtor>
+  void* AllocateWithCleanup(size_t size, Align align, TagOrDtor cleanup);
+
+  template <typename Align, typename TagOrDtor>
+  void* TryAllocateWithCleanup(size_t size, Align align, TagOrDtor cleanup);
+
+  std::string* AllocateString();
+  std::string* TryAllocateString();
+
+  void* AllocateEmbedded(size_t size, cleanup::Tag tag);
+  void* TryAllocateEmbedded(size_t size, cleanup::Tag tag);
+
+ private:
+  void* AllocateFromExisting(size_t n) {
+    PROTOBUF_UNPOISON_MEMORY_REGION(ptr(), n);
+    void* ret = ptr();
+    set_ptr(static_cast<char*>(ret) + n);
+    return ret;
+  }
+
+  // See comments on `cached_blocks_` member for details.
+  void ReturnArrayMemory(void* p, size_t size) {
+    // We only need to check for 32-bit platforms.
+    // In 64-bit platforms the minimum allocation size from Repeated*Field will
+    // be 16 guaranteed.
+    if (sizeof(void*) < 8) {
+      if (PROTOBUF_PREDICT_FALSE(size < 16)) return;
+    } else {
+      PROTOBUF_ASSUME(size >= 16);
+    }
+
+    // We round down to the next smaller block in case the memory doesn't match
+    // the pattern we are looking for. eg, someone might have called Reserve()
+    // on the repeated field.
+    const size_t index = absl::bit_width(size) - 5;
+
+    if (PROTOBUF_PREDICT_FALSE(index >= cached_block_length_)) {
+      // We can't put this object on the freelist so make this object the
+      // freelist. It is guaranteed it is larger than the one we have, and
+      // large enough to hold another allocation of `size`.
+      CachedBlock** new_list = static_cast<CachedBlock**>(p);
+      size_t new_size = size / sizeof(CachedBlock*);
+
+      std::copy(cached_blocks_, cached_blocks_ + cached_block_length_,
+                new_list);
+
+      // We need to unpoison this memory before filling it in case it has been
+      // poisoned by another santizer client.
+      PROTOBUF_UNPOISON_MEMORY_REGION(
+          new_list + cached_block_length_,
+          (new_size - cached_block_length_) * sizeof(CachedBlock*));
+
+      std::fill(new_list + cached_block_length_, new_list + new_size, nullptr);
+
+      cached_blocks_ = new_list;
+      // Make the size fit in uint8_t. This is the power of two, so we don't
+      // need anything larger.
+      cached_block_length_ =
+          static_cast<uint8_t>(std::min(size_t{64}, new_size));
+
+      return;
+    }
+
+    auto& cached_head = cached_blocks_[index];
+    auto* new_node = static_cast<CachedBlock*>(p);
+    new_node->next = cached_head;
+    cached_head = new_node;
+    PROTOBUF_POISON_MEMORY_REGION(p, size);
+  }
+
+ public:
+  // Allocate space if the current region provides enough space.
+  bool MaybeAllocateAligned(size_t n, void** out) {
+    GOOGLE_DCHECK(internal::ArenaAlignDefault::IsAligned(n));
+    GOOGLE_DCHECK_GE(limit_, ptr());
+    if (PROTOBUF_PREDICT_FALSE(!HasSpace(n))) return false;
+    *out = AllocateFromExisting(n);
+    return true;
+  }
+
+  // If there is enough space in the current block, allocate space for one `T`
+  // object and register for destruction. The object has not been constructed
+  // and the memory returned is uninitialized.
+  template <typename T>
+  void* MaybeAllocateWithCleanup();
+
+  template <typename TagOrCleanup>
+  void AddCleanup(void* elem, TagOrCleanup cleanup);
+
+ private:
+  using StringBlock = TypedBlock<std::string>;
+  friend class ThreadSafeArena;
+
+  // Creates a new SerialArena inside mem using the remaining memory as for
+  // future allocations.
+  // The `parent` arena must outlive the serial arena, which is guaranteed
+  // because the parent manages the lifetime of the serial arenas.
+  static SerialArena* New(SerialArena::Memory mem, ThreadSafeArena& parent);
+  // Free SerialArena returning the memory passed in to New
+  template <typename Deallocator>
+  Memory Free(Deallocator deallocator);
+
+  // Members are declared here to track sizeof(SerialArena) and hotness
+  // centrally. They are (roughly) laid out in descending order of hotness.
+
+  // Next pointer to allocate from.  Always 8-byte aligned.  Points inside
+  // head_ (and head_->pos will always be non-canonical).  We keep these
+  // here to reduce indirection.
+  std::atomic<char*> ptr_{nullptr};
+  // Limiting address up to which memory can be allocated from the head block.
+  char* limit_ = nullptr;
+  StringBlock* strings_ = StringBlock::sentinel();
+
+  std::atomic<ArenaBlock*> head_{nullptr};  // Head of linked list of blocks.
+  std::atomic<size_t> space_used_{0};       // Necessary for metrics.
+  std::atomic<size_t> space_allocated_{0};
+  ThreadSafeArena& parent_;
+
+  // Repeated*Field and Arena play together to reduce memory consumption by
+  // reusing blocks. Currently, natural growth of the repeated field types makes
+  // them allocate blocks of size `8 + 2^N, N>=3`.
+  // When the repeated field grows returns the previous block and we put it in
+  // this free list.
+  // `cached_blocks_[i]` points to the free list for blocks of size `8+2^(i+3)`.
+  // The array of freelists is grown when needed in `ReturnArrayMemory()`.
+  struct CachedBlock {
+    // Simple linked list.
+    CachedBlock* next;
+  };
+  uint8_t cached_block_length_ = 0;
+  CachedBlock** cached_blocks_ = nullptr;
+
+  // Helper getters/setters to handle relaxed operations on atomic variables.
+  ArenaBlock* head() { return head_.load(std::memory_order_relaxed); }
+  const ArenaBlock* head() const {
+    return head_.load(std::memory_order_relaxed);
+  }
+
+  char* ptr() { return ptr_.load(std::memory_order_relaxed); }
+  const char* ptr() const { return ptr_.load(std::memory_order_relaxed); }
+  void set_ptr(char* ptr) { return ptr_.store(ptr, std::memory_order_relaxed); }
+
+  // Constructor is private as only New() should be used.
+  inline SerialArena(ArenaBlock* b, ThreadSafeArena& parent);
+
+  // Constructors to handle the first SerialArena.
+  inline explicit SerialArena(ThreadSafeArena& parent);
+  inline SerialArena(FirstSerialArena, ArenaBlock* b, ThreadSafeArena& parent);
+
+  void* AllocateAlignedFallback(size_t n);
+
+  template <typename TagOrCleanup>
+  void BlindlyAddCleanup(void* elem, TagOrCleanup cleanup);
+
+  template <typename TagOrCleanup>
+  void AddCleanupFallback(void* elem, TagOrCleanup cleanup);
+
+  template <typename Align, typename TagOrDtor>
+  void* BlindlyAllocateWithCleanup(size_t size, Align align, TagOrDtor cleanup);
+
+  template <typename Align, typename TagOrDtor>
+  void* AllocateWithCleanupFallback(size_t size, Align align,
+                                    TagOrDtor cleanup);
+
+  void* BlindlyAllocateEmbedded(size_t size, cleanup::Tag tag);
+  void* AllocateEmbeddedFallback(size_t size, cleanup::Tag tag);
+
+  std::string* AllocateStringFallback();
+
+  inline void AllocateNewBlock(size_t n);
+  inline void Init(ArenaBlock* b, size_t offset);
+
+ public:
+ public:
+  static constexpr size_t kBlockHeaderSize =
+      ArenaAlignDefault::Ceil(sizeof(ArenaBlock));
+};
+
+template <typename TagOrCleanup>
+inline PROTOBUF_ALWAYS_INLINE void SerialArena::AddCleanup(
+    void* elem, TagOrCleanup cleanup) {
+  const size_t n = cleanup::CleanupSize(cleanup);
+  if (PROTOBUF_PREDICT_FALSE(!HasSpace(n))) {
+    return AddCleanupFallback(elem, cleanup);
+  }
+  BlindlyAddCleanup(elem, cleanup);
+}
+
+template <typename TagOrCleanup>
+inline PROTOBUF_ALWAYS_INLINE void SerialArena::BlindlyAddCleanup(
+    void* elem, TagOrCleanup cleanup) {
+  const size_t n = cleanup::CleanupSize(cleanup);
+  GOOGLE_DCHECK(HasSpace(n));
+  limit_ -= n;
+  PROTOBUF_UNPOISON_MEMORY_REGION(limit_, n);
+  cleanup::CreateNode(limit_, elem, cleanup);
+}
+
+template <typename Align, typename TagOrDtor>
+inline PROTOBUF_ALWAYS_INLINE void* SerialArena::BlindlyAllocateWithCleanup(
+    size_t size, Align align, TagOrDtor cleanup) {
+  GOOGLE_DCHECK(align.IsAligned(size));
+  char* ptr = align.CeilDefaultAligned(this->ptr());
+  PROTOBUF_UNPOISON_MEMORY_REGION(ptr, size);
+  BlindlyAddCleanup(ptr, cleanup);
+  GOOGLE_DCHECK_LE(ptr + size, limit_);
+  set_ptr(ptr + size);
+  return ptr;
+}
+
+template <typename Align, typename TagOrDtor>
+inline PROTOBUF_NDEBUG_INLINE void* SerialArena::AllocateWithCleanup(
+    size_t size, Align align, TagOrDtor cleanup) {
+  const size_t n = align.Padded(size) + cleanup::CleanupSize(cleanup);
+  if (PROTOBUF_PREDICT_TRUE(HasSpace(n))) {
+    return BlindlyAllocateWithCleanup(size, align, cleanup);
+  }
+  return AllocateWithCleanupFallback(size, align, cleanup);
+}
+
+template <typename Align, typename TagOrDtor>
+inline PROTOBUF_NDEBUG_INLINE void* SerialArena::TryAllocateWithCleanup(
+    size_t size, Align align, TagOrDtor cleanup) {
+  const size_t n = align.Padded(size) + cleanup::CleanupSize(cleanup);
+  if (PROTOBUF_PREDICT_FALSE(!HasSpace(n))) return nullptr;
+  void* ptr = BlindlyAllocateWithCleanup(size, align, cleanup);
+  PROTOBUF_ASSUME(ptr != nullptr);
+  return ptr;
+}
+
+template <typename T>
+inline PROTOBUF_ALWAYS_INLINE void* SerialArena::MaybeAllocateWithCleanup() {
+  static_assert(!std::is_trivially_destructible<T>::value, "");
+  constexpr auto align = internal::ArenaAlignOf<T>();
+  return TryAllocateWithCleanup(align.Ceil(sizeof(T)), align,
+                                cleanup::arena_destruct_object<T>);
+}
+
+template <>
+inline PROTOBUF_ALWAYS_INLINE void*
+SerialArena::MaybeAllocateWithCleanup<std::string>() {
+  return TryAllocateString();
+}
+
+template <>
+inline PROTOBUF_ALWAYS_INLINE void*
+SerialArena::MaybeAllocateWithCleanup<absl::Cord>() {
+  return TryAllocateWithCleanup(cleanup::AllocationSize<absl::Cord>(),
+                                ArenaAlignDefault(), cleanup::Tag::kCord);
+}
+
+inline PROTOBUF_NDEBUG_INLINE void* SerialArena::BlindlyAllocateEmbedded(
+    size_t size, cleanup::Tag tag) {
+  GOOGLE_DCHECK(HasSpace(size));
+  limit_ -= size;
+  PROTOBUF_UNPOISON_MEMORY_REGION(limit_, size);
+  cleanup::CreateNode(limit_, tag);
+  return limit_ + sizeof(cleanup::TaggedNode);
+}
+
+inline PROTOBUF_NDEBUG_INLINE void* SerialArena::AllocateEmbedded(
+    size_t size, cleanup::Tag tag) {
+  if (PROTOBUF_PREDICT_TRUE(HasSpace(size))) {
+    return BlindlyAllocateEmbedded(size, tag);
+  }
+  return AllocateEmbeddedFallback(size, tag);
+}
+
+inline PROTOBUF_NDEBUG_INLINE void* SerialArena::TryAllocateEmbedded(
+    size_t size, cleanup::Tag tag) {
+  const size_t n = sizeof(cleanup::TaggedNode) + size;
+  if (PROTOBUF_PREDICT_FALSE(!HasSpace(n))) return nullptr;
+  void* ptr = BlindlyAllocateEmbedded(size, tag);
+  PROTOBUF_ASSUME(ptr != nullptr);
+  return ptr;
+}
+
+inline PROTOBUF_NDEBUG_INLINE std::string* SerialArena::AllocateString() {
+  std::string* s = strings_->TryAllocate();
+  if (PROTOBUF_PREDICT_TRUE(s != nullptr)) return s;
+  return AllocateStringFallback();
+}
+
+inline PROTOBUF_NDEBUG_INLINE std::string* SerialArena::TryAllocateString() {
+  return strings_->TryAllocate();
+}
+
+}  // namespace internal
+}  // namespace protobuf
+}  // namespace google
+
+#include "google/protobuf/port_undef.inc"
+
+#endif  // GOOGLE_PROTOBUF_SERIAL_ARENA_H__
diff --git a/src/google/protobuf/thread_safe_arena.h b/src/google/protobuf/thread_safe_arena.h
new file mode 100644
index 0000000..757763c
--- /dev/null
+++ b/src/google/protobuf/thread_safe_arena.h
@@ -0,0 +1,320 @@
+// Protocol Buffers - Google's data interchange format
+// Copyright 2022 Google Inc.  All rights reserved.
+// https://developers.google.com/protocol-buffers/
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are
+// met:
+//
+//     * Redistributions of source code must retain the above copyright
+// notice, this list of conditions and the following disclaimer.
+//     * Redistributions in binary form must reproduce the above
+// copyright notice, this list of conditions and the following disclaimer
+// in the documentation and/or other materials provided with the
+// distribution.
+//     * Neither the name of Google Inc. nor the names of its
+// contributors may be used to endorse or promote products derived from
+// this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+//
+// This file defines the internal class ThreadSafeArena
+
+#ifndef GOOGLE_PROTOBUF_THREAD_SAFE_ARENA_H__
+#define GOOGLE_PROTOBUF_THREAD_SAFE_ARENA_H__
+
+#include <algorithm>
+#include <atomic>
+#include <string>
+#include <type_traits>
+#include <utility>
+
+#include "absl/synchronization/mutex.h"
+#include "google/protobuf/arena_align.h"
+#include "google/protobuf/arena_allocation_policy.h"
+#include "google/protobuf/arena_cleanup.h"
+#include "google/protobuf/arena_config.h"
+#include "google/protobuf/arenaz_sampler.h"
+#include "google/protobuf/port.h"
+#include "google/protobuf/serial_arena.h"
+
+// Must be included last.
+#include "google/protobuf/port_def.inc"
+
+namespace google {
+namespace protobuf {
+namespace internal {
+
+// use #ifdef the select the best implementation based on hardware / OS.
+class PROTOBUF_EXPORT ThreadSafeArena {
+ public:
+  ThreadSafeArena();
+
+  ThreadSafeArena(char* mem, size_t size);
+
+  explicit ThreadSafeArena(void* mem, size_t size,
+                           const AllocationPolicy& policy);
+
+  // All protos have pointers back to the arena hence Arena must have
+  // pointer stability.
+  ThreadSafeArena(const ThreadSafeArena&) = delete;
+  ThreadSafeArena& operator=(const ThreadSafeArena&) = delete;
+  ThreadSafeArena(ThreadSafeArena&&) = delete;
+  ThreadSafeArena& operator=(ThreadSafeArena&&) = delete;
+
+  // Destructor deletes all owned heap allocated objects, and destructs objects
+  // that have non-trivial destructors, except for proto2 message objects whose
+  // destructors can be skipped. Also, frees all blocks except the initial block
+  // if it was passed in.
+  ~ThreadSafeArena();
+
+  uint64_t Reset();
+
+  uint64_t SpaceAllocated() const;
+  uint64_t SpaceUsed() const;
+
+  template <AllocationClient alloc_client = AllocationClient::kDefault>
+  void* AllocateAligned(size_t n) {
+    SerialArena* arena;
+    if (PROTOBUF_PREDICT_TRUE(GetSerialArenaFast(&arena))) {
+      return arena->AllocateAligned<alloc_client>(n);
+    } else {
+      return AllocateAlignedFallback<alloc_client>(n);
+    }
+  }
+
+  void ReturnArrayMemory(void* p, size_t size) {
+    SerialArena* arena;
+    if (PROTOBUF_PREDICT_TRUE(GetSerialArenaFast(&arena))) {
+      arena->ReturnArrayMemory(p, size);
+    }
+  }
+
+  // This function allocates n bytes if the common happy case is true and
+  // returns true. Otherwise does nothing and returns false. This strange
+  // semantics is necessary to allow callers to program functions that only
+  // have fallback function calls in tail position. This substantially improves
+  // code for the happy path.
+  PROTOBUF_NDEBUG_INLINE bool MaybeAllocateAligned(size_t n, void** out) {
+    SerialArena* arena;
+    if (PROTOBUF_PREDICT_TRUE(GetSerialArenaFast(&arena))) {
+      return arena->MaybeAllocateAligned(n, out);
+    }
+    return false;
+  }
+
+  // Add object pointer and cleanup function pointer to the list.
+  template <typename TagOrCleanup>
+  void AddCleanup(void* elem, TagOrCleanup cleanup);
+
+  template <typename Align, typename TagOrDtor>
+  void* AllocateWithCleanup(size_t size, Align align, TagOrDtor cleanup) {
+    SerialArena* arena;
+    if (PROTOBUF_PREDICT_TRUE(GetSerialArenaFast(&arena))) {
+      return arena->AllocateWithCleanup(size, align, cleanup);
+    }
+    return AllocateWithCleanupFallback(size, align, cleanup);
+  }
+
+  void* AllocateEmbedded(size_t size, cleanup::Tag tag) {
+    SerialArena* arena;
+    if (PROTOBUF_PREDICT_TRUE(GetSerialArenaFast(&arena))) {
+      return arena->AllocateEmbedded(size, tag);
+    }
+    return AllocateEmbeddedFallback(size, tag);
+  }
+
+  std::string* AllocateString() {
+    SerialArena* arena;
+    if (PROTOBUF_PREDICT_TRUE(GetSerialArenaFast(&arena))) {
+      return arena->AllocateString();
+    }
+    return AllocateStringFallback();
+  }
+
+ private:
+  friend class ArenaBenchmark;
+  friend class TcParser;
+  friend class SerialArena;
+  friend struct SerialArenaChunkHeader;
+  static uint64_t GetNextLifeCycleId();
+
+  class SerialArenaChunk;
+
+  // Returns a new SerialArenaChunk that has {id, serial} at slot 0. It may
+  // grow based on "prev_num_slots".
+  static SerialArenaChunk* NewSerialArenaChunk(uint32_t prev_capacity, void* id,
+                                               SerialArena* serial);
+  static SerialArenaChunk* SentrySerialArenaChunk();
+
+  // Returns the first ArenaBlock* for the first SerialArena. If users provide
+  // one, use it if it's acceptable. Otherwise returns a sentry block.
+  ArenaBlock* FirstBlock(void* buf, size_t size);
+  // Same as the above but returns a valid block if "policy" is not default.
+  ArenaBlock* FirstBlock(void* buf, size_t size,
+                         const AllocationPolicy& policy);
+
+  // Adds SerialArena to the chunked list. May create a new chunk.
+  void AddSerialArena(void* id, SerialArena* serial);
+
+  // Members are declared here to track sizeof(ThreadSafeArena) and hotness
+  // centrally.
+
+  // Unique for each arena. Changes on Reset().
+  uint64_t tag_and_id_ = 0;
+
+  TaggedAllocationPolicyPtr alloc_policy_;  // Tagged pointer to AllocPolicy.
+  ThreadSafeArenaStatsHandle arena_stats_;
+
+  // Adding a new chunk to head_ must be protected by mutex_.
+  absl::Mutex mutex_;
+  // Pointer to a linked list of SerialArenaChunk.
+  std::atomic<SerialArenaChunk*> head_{nullptr};
+
+  void* first_owner_;
+  // Must be declared after alloc_policy_; otherwise, it may lose info on
+  // user-provided initial block.
+  SerialArena first_arena_;
+
+  static_assert(std::is_trivially_destructible<SerialArena>{},
+                "SerialArena needs to be trivially destructible.");
+
+  const AllocationPolicy* AllocPolicy() const { return alloc_policy_.get(); }
+  void InitializeWithPolicy(const AllocationPolicy& policy);
+
+  void Init();
+
+  // Delete or Destruct all objects owned by the arena.
+  void CleanupList();
+
+  inline void CacheSerialArena(SerialArena* serial) {
+    thread_cache().last_serial_arena = serial;
+    thread_cache().last_lifecycle_id_seen = tag_and_id_;
+  }
+
+  PROTOBUF_NDEBUG_INLINE bool GetSerialArenaFast(SerialArena** arena) {
+    // If this thread already owns a block in this arena then try to use that.
+    // This fast path optimizes the case where multiple threads allocate from
+    // the same arena.
+    ThreadCache* tc = &thread_cache();
+    if (PROTOBUF_PREDICT_TRUE(tc->last_lifecycle_id_seen == tag_and_id_)) {
+      *arena = tc->last_serial_arena;
+      return true;
+    }
+    return false;
+  }
+
+  // Finds SerialArena or creates one if not found. When creating a new one,
+  // create a big enough block to accommodate n bytes.
+  SerialArena* GetSerialArenaFallback(size_t n);
+
+  template <AllocationClient alloc_client = AllocationClient::kDefault>
+  void* AllocateAlignedFallback(size_t n);
+
+  template <typename Align, typename TagOrDtor>
+  void* AllocateWithCleanupFallback(size_t size, Align align,
+                                    TagOrDtor cleanup);
+
+  void* AllocateEmbeddedFallback(size_t size, cleanup::Tag tag);
+  std::string* AllocateStringFallback();
+
+  // Executes callback function over SerialArenaChunk. Passes const
+  // SerialArenaChunk*.
+  template <typename Functor>
+  void WalkConstSerialArenaChunk(Functor fn) const;
+
+  // Executes callback function over SerialArenaChunk.
+  template <typename Functor>
+  void WalkSerialArenaChunk(Functor fn);
+
+  // Executes callback function over SerialArena in chunked list in reverse
+  // chronological order. Passes const SerialArena*.
+  template <typename Functor>
+  void PerConstSerialArenaInChunk(Functor fn) const;
+
+  // Releases all memory except the first block which it returns. The first
+  // block might be owned by the user and thus need some extra checks before
+  // deleting.
+  SerialArena::Memory Free(size_t* space_allocated);
+
+#ifdef _MSC_VER
+#pragma warning(disable : 4324)
+#endif
+  struct alignas(kCacheAlignment) ThreadCache {
+    // Number of per-thread lifecycle IDs to reserve. Must be power of two.
+    // To reduce contention on a global atomic, each thread reserves a batch of
+    // IDs.  The following number is calculated based on a stress test with
+    // ~6500 threads all frequently allocating a new arena.
+    static constexpr size_t kPerThreadIds = 256;
+    // Next lifecycle ID available to this thread. We need to reserve a new
+    // batch, if `next_lifecycle_id & (kPerThreadIds - 1) == 0`.
+    uint64_t next_lifecycle_id{0};
+    // The ThreadCache is considered valid as long as this matches the
+    // lifecycle_id of the arena being used.
+    uint64_t last_lifecycle_id_seen{static_cast<uint64_t>(-1)};
+    SerialArena* last_serial_arena{nullptr};
+  };
+
+  // Lifecycle_id can be highly contended variable in a situation of lots of
+  // arena creation. Make sure that other global variables are not sharing the
+  // cacheline.
+#ifdef _MSC_VER
+#pragma warning(disable : 4324)
+#endif
+  using LifecycleId = uint64_t;
+  ABSL_CONST_INIT alignas(
+      kCacheAlignment) static std::atomic<LifecycleId> lifecycle_id_;
+#if defined(PROTOBUF_NO_THREADLOCAL)
+  // iOS does not support __thread keyword so we use a custom thread local
+  // storage class we implemented.
+  static ThreadCache& thread_cache();
+#elif defined(PROTOBUF_USE_DLLS)
+  // Thread local variables cannot be exposed through DLL interface but we can
+  // wrap them in static functions.
+  static ThreadCache& thread_cache();
+#else
+  static PROTOBUF_THREAD_LOCAL ThreadCache thread_cache_;
+  static ThreadCache& thread_cache() { return thread_cache_; }
+#endif
+
+ public:
+  // kBlockHeaderSize is sizeof(ArenaBlock), aligned up to the default alignment
+  // to protect the invariant that `pos` is always default aligned.
+  static constexpr size_t kBlockHeaderSize = SerialArena::kBlockHeaderSize;
+  static constexpr size_t kSerialArenaSize =
+      ArenaAlignDefault::Ceil(sizeof(SerialArena));
+  static constexpr size_t kAllocPolicySize =
+      ArenaAlignDefault::Ceil(sizeof(AllocationPolicy));
+  static constexpr size_t kMaxCleanupNodeSize = 16;
+  static_assert(ArenaAlignDefault::IsAligned(kBlockHeaderSize),
+                "kBlockHeaderSize must be default aligned.");
+  static_assert(ArenaAlignDefault::IsAligned(kSerialArenaSize),
+                "kSerialArenaSize must be default aligned.");
+};
+
+template <typename TagOrCleanup>
+inline void ThreadSafeArena::AddCleanup(void* elem, TagOrCleanup cleanup) {
+  SerialArena* arena;
+  if (PROTOBUF_PREDICT_FALSE(!GetSerialArenaFast(&arena))) {
+    arena = GetSerialArenaFallback(kMaxCleanupNodeSize);
+  }
+  arena->AddCleanup(elem, cleanup);
+}
+
+}  // namespace internal
+}  // namespace protobuf
+}  // namespace google
+
+#include "google/protobuf/port_undef.inc"
+
+#endif  // GOOGLE_PROTOBUF_THREAD_SAFE_ARENA_H__