Make arena fuse and inc/dec-ref const, so that they can be invoked concurrently.

This didn't require any change to the algorithm, except to mark one extra (immutable) member as atomic. It did require changing the tests though.

PiperOrigin-RevId: 690751567
diff --git a/upb/mem/BUILD b/upb/mem/BUILD
index 9fe9092..2afa190 100644
--- a/upb/mem/BUILD
+++ b/upb/mem/BUILD
@@ -46,6 +46,7 @@
     deps = [
         ":mem",
         "//upb:port",
+        "@com_google_absl//absl/base:core_headers",
         "@com_google_absl//absl/random",
         "@com_google_absl//absl/random:distributions",
         "@com_google_absl//absl/synchronization",
diff --git a/upb/mem/arena.c b/upb/mem/arena.c
index 435a1e1..7847943 100644
--- a/upb/mem/arena.c
+++ b/upb/mem/arena.c
@@ -157,7 +157,7 @@
 }
 #endif  // UPB_TRACING_ENABLED
 
-static upb_ArenaRoot _upb_Arena_FindRoot(upb_Arena* a) {
+static upb_ArenaRoot _upb_Arena_FindRoot(const upb_Arena* a) {
   upb_ArenaInternal* ai = upb_Arena_Internal(a);
   uintptr_t poc = upb_Atomic_Load(&ai->parent_or_count, memory_order_acquire);
   while (_upb_Arena_IsTaggedPointer(poc)) {
@@ -431,7 +431,8 @@
   upb_Atomic_Store(&parent->tail, parent_tail, memory_order_relaxed);
 }
 
-static upb_ArenaInternal* _upb_Arena_DoFuse(upb_Arena* a1, upb_Arena* a2,
+static upb_ArenaInternal* _upb_Arena_DoFuse(const upb_Arena* a1,
+                                            const upb_Arena* a2,
                                             uintptr_t* ref_delta) {
   // `parent_or_count` has two distinct modes
   // -  parent pointer mode
@@ -503,7 +504,7 @@
                                           memory_order_relaxed);
 }
 
-bool upb_Arena_Fuse(upb_Arena* a1, upb_Arena* a2) {
+bool upb_Arena_Fuse(const upb_Arena* a1, const upb_Arena* a2) {
   if (a1 == a2) return true;  // trivial fuse
 
 #ifdef UPB_TRACING_ENABLED
@@ -530,7 +531,7 @@
   }
 }
 
-bool upb_Arena_IncRefFor(upb_Arena* a, const void* owner) {
+bool upb_Arena_IncRefFor(const upb_Arena* a, const void* owner) {
   upb_ArenaInternal* ai = upb_Arena_Internal(a);
   if (_upb_ArenaInternal_HasInitialBlock(ai)) return false;
   upb_ArenaRoot r;
@@ -549,7 +550,9 @@
   goto retry;
 }
 
-void upb_Arena_DecRefFor(upb_Arena* a, const void* owner) { upb_Arena_Free(a); }
+void upb_Arena_DecRefFor(const upb_Arena* a, const void* owner) {
+  upb_Arena_Free((upb_Arena*)a);
+}
 
 void UPB_PRIVATE(_upb_Arena_SwapIn)(upb_Arena* des, const upb_Arena* src) {
   upb_ArenaInternal* desi = upb_Arena_Internal(des);
diff --git a/upb/mem/arena.h b/upb/mem/arena.h
index 2ad4f5d..846cb37 100644
--- a/upb/mem/arena.h
+++ b/upb/mem/arena.h
@@ -41,10 +41,10 @@
 UPB_API upb_Arena* upb_Arena_Init(void* mem, size_t n, upb_alloc* alloc);
 
 UPB_API void upb_Arena_Free(upb_Arena* a);
-UPB_API bool upb_Arena_Fuse(upb_Arena* a, upb_Arena* b);
+UPB_API bool upb_Arena_Fuse(const upb_Arena* a, const upb_Arena* b);
 
-bool upb_Arena_IncRefFor(upb_Arena* a, const void* owner);
-void upb_Arena_DecRefFor(upb_Arena* a, const void* owner);
+bool upb_Arena_IncRefFor(const upb_Arena* a, const void* owner);
+void upb_Arena_DecRefFor(const upb_Arena* a, const void* owner);
 
 size_t upb_Arena_SpaceAllocated(upb_Arena* a, size_t* fused_count);
 uint32_t upb_Arena_DebugRefCount(upb_Arena* a);
diff --git a/upb/mem/arena_test.cc b/upb/mem/arena_test.cc
index 2f0647f..43570b7 100644
--- a/upb/mem/arena_test.cc
+++ b/upb/mem/arena_test.cc
@@ -10,17 +10,20 @@
 #include <stddef.h>
 
 #include <array>
-#include <atomic>
+#include <memory>
 #include <thread>
 #include <vector>
 
 #include <gtest/gtest.h>
+#include "absl/base/thread_annotations.h"
 #include "absl/random/distributions.h"
 #include "absl/random/random.h"
+#include "absl/synchronization/mutex.h"
 #include "absl/synchronization/notification.h"
 #include "absl/time/clock.h"
 #include "absl/time/time.h"
 #include "upb/mem/alloc.h"
+#include "upb/mem/arena.hpp"
 
 // Must be last.
 #include "upb/port/def.inc"
@@ -66,39 +69,21 @@
 
 class Environment {
  public:
-  ~Environment() {
-    for (auto& atom : arenas_) {
-      auto* a = atom.load(std::memory_order_relaxed);
-      if (a != nullptr) upb_Arena_Free(a);
-    }
-  }
-
   void RandomNewFree(absl::BitGen& gen) {
-    auto* old = SwapRandomly(gen, upb_Arena_New());
-    if (old != nullptr) upb_Arena_Free(old);
+    auto a = std::make_shared<const upb::Arena>();
+    SwapRandomArena(gen, a);
   }
 
   void RandomIncRefCount(absl::BitGen& gen) {
-    auto* a = SwapRandomly(gen, nullptr);
-    if (a != nullptr) {
-      upb_Arena_IncRefFor(a, nullptr);
-      upb_Arena_DecRefFor(a, nullptr);
-      upb_Arena_Free(a);
-    }
+    std::shared_ptr<const upb::Arena> a = RandomNonNullArena(gen);
+    upb_Arena_IncRefFor(a->ptr(), nullptr);
+    upb_Arena_DecRefFor(a->ptr(), nullptr);
   }
 
   void RandomFuse(absl::BitGen& gen) {
-    std::array<upb_Arena*, 2> old;
-    for (auto& o : old) {
-      o = SwapRandomly(gen, nullptr);
-      if (o == nullptr) o = upb_Arena_New();
-    }
-
-    EXPECT_TRUE(upb_Arena_Fuse(old[0], old[1]));
-    for (auto& o : old) {
-      o = SwapRandomly(gen, o);
-      if (o != nullptr) upb_Arena_Free(o);
-    }
+    std::shared_ptr<const upb::Arena> a = RandomNonNullArena(gen);
+    std::shared_ptr<const upb::Arena> b = RandomNonNullArena(gen);
+    EXPECT_TRUE(upb_Arena_Fuse(a->ptr(), b->ptr()));
   }
 
   void RandomPoke(absl::BitGen& gen) {
@@ -115,12 +100,34 @@
   }
 
  private:
-  upb_Arena* SwapRandomly(absl::BitGen& gen, upb_Arena* a) {
-    return arenas_[absl::Uniform<size_t>(gen, 0, arenas_.size())].exchange(
-        a, std::memory_order_acq_rel);
+  size_t RandomIndex(absl::BitGen& gen) {
+    return absl::Uniform<size_t>(gen, 0, std::tuple_size<ArenaArray>::value);
   }
 
-  std::array<std::atomic<upb_Arena*>, 100> arenas_ = {};
+  // Swaps a random arena from the set with the given arena.
+  void SwapRandomArena(absl::BitGen& gen,
+                       std::shared_ptr<const upb::Arena>& a) {
+    size_t i = RandomIndex(gen);
+    absl::MutexLock lock(&mutex_);
+    arenas_[i].swap(a);
+  }
+
+  // Returns a random arena from the set, ensuring that the returned arena is
+  // non-null.
+  //
+  // Note that the returned arena is shared and may be accessed concurrently
+  // by other threads.
+  std::shared_ptr<const upb::Arena> RandomNonNullArena(absl::BitGen& gen) {
+    size_t i = RandomIndex(gen);
+    absl::MutexLock lock(&mutex_);
+    std::shared_ptr<const upb::Arena>& ret = arenas_[i];
+    if (!ret) ret = std::make_shared<const upb::Arena>();
+    return ret;
+  }
+
+  using ArenaArray = std::array<std::shared_ptr<const upb::Arena>, 100>;
+  ArenaArray arenas_ ABSL_GUARDED_BY(mutex_);
+  absl::Mutex mutex_;
 };
 
 TEST(ArenaTest, FuzzSingleThreaded) {