Efficient Conversion of a FlatBufferBuilder to a MessageBuilder (#4980)

* Efficient conversion of FlatBufferBuilder to grpc::MessageBuilder

* Added a variety of tests to validate correctness of the MessageBuilder move operations.
Disable MessageBuilder half-n-half tests on MacOS.

* Fix failing Android build

* Generalized the MessageBuilder move constructor to accept a deallocator
diff --git a/grpc/tests/grpctest.cpp b/grpc/tests/grpctest.cpp
index 50b17cc..7e5c6e6 100644
--- a/grpc/tests/grpctest.cpp
+++ b/grpc/tests/grpctest.cpp
@@ -23,6 +23,9 @@
 #include "test_assert.h"
 
 using namespace MyGame::Example;
+using flatbuffers::grpc::MessageBuilder;
+using flatbuffers::FlatBufferBuilder;
+
 void message_builder_tests();
 
 // The callback implementation of our server, that derives from the generated
@@ -46,9 +49,9 @@
       const flatbuffers::grpc::Message<Stat> *request,
       ::grpc::ServerWriter<flatbuffers::grpc::Message<Monster>> *writer)
       override {
-    for (int i = 0; i < 10; i++) {
+    for (int i = 0; i < 5; i++) {
       fbb_.Clear();
-      // Create 10 monsters for resposne.
+      // Create 5 monsters for resposne.
       auto monster_offset =
           CreateMonster(fbb_, 0, 0, 0,
                         fbb_.CreateString(request->GetRoot()->id()->str() +
@@ -94,6 +97,45 @@
   server_instance->Wait();
 }
 
+template <class Builder>
+void StoreRPC(MonsterStorage::Stub *stub) {
+  Builder fbb;
+  grpc::ClientContext context;
+  // Build a request with the name set.
+  auto monster_offset = CreateMonster(fbb, 0, 0, 0, fbb.CreateString("Fred"));
+  MessageBuilder mb(std::move(fbb));
+  mb.Finish(monster_offset);
+  auto request = mb.ReleaseMessage<Monster>();
+  flatbuffers::grpc::Message<Stat> response;
+
+  // The actual RPC.
+  auto status = stub->Store(&context, request, &response);
+
+  if (status.ok()) {
+    auto resp = response.GetRoot()->id();
+    std::cout << "RPC response: " << resp->str() << std::endl;
+  } else {
+    std::cout << "RPC failed" << std::endl;
+  }
+}
+
+template <class Builder>
+void RetrieveRPC(MonsterStorage::Stub *stub) {
+  Builder fbb;
+  grpc::ClientContext context;
+  fbb.Clear();
+  auto stat_offset = CreateStat(fbb, fbb.CreateString("Fred"));
+  fbb.Finish(stat_offset);
+  auto request = MessageBuilder(std::move(fbb)).ReleaseMessage<Stat>();
+
+  flatbuffers::grpc::Message<Monster> response;
+  auto stream = stub->Retrieve(&context, request);
+  while (stream->Read(&response)) {
+    auto resp = response.GetRoot()->name();
+    std::cout << "RPC Streaming response: " << resp->str() << std::endl;
+  }
+}
+
 int grpc_server_test() {
   // Launch server.
   std::thread server_thread(RunServer);
@@ -107,39 +149,12 @@
                                      grpc::InsecureChannelCredentials());
   auto stub = MyGame::Example::MonsterStorage::NewStub(channel);
 
-  flatbuffers::grpc::MessageBuilder fbb;
-  {
-    grpc::ClientContext context;
-    // Build a request with the name set.
-    auto monster_offset = CreateMonster(fbb, 0, 0, 0, fbb.CreateString("Fred"));
-    fbb.Finish(monster_offset);
-    auto request = fbb.ReleaseMessage<Monster>();
-    flatbuffers::grpc::Message<Stat> response;
+  StoreRPC<MessageBuilder>(stub.get());
+  StoreRPC<FlatBufferBuilder>(stub.get());
 
-    // The actual RPC.
-    auto status = stub->Store(&context, request, &response);
+  RetrieveRPC<MessageBuilder>(stub.get());
+  RetrieveRPC<FlatBufferBuilder>(stub.get());
 
-    if (status.ok()) {
-      auto resp = response.GetRoot()->id();
-      std::cout << "RPC response: " << resp->str() << std::endl;
-    } else {
-      std::cout << "RPC failed" << std::endl;
-    }
-  }
-  {
-    grpc::ClientContext context;
-    fbb.Clear();
-    auto stat_offset = CreateStat(fbb, fbb.CreateString("Fred"));
-    fbb.Finish(stat_offset);
-    auto request = fbb.ReleaseMessage<Stat>();
-
-    flatbuffers::grpc::Message<Monster> response;
-    auto stream = stub->Retrieve(&context, request);
-    while (stream->Read(&response)) {
-      auto resp = response.GetRoot()->name();
-      std::cout << "RPC Streaming response: " << resp->str() << std::endl;
-    }
-  }
 
 #if !FLATBUFFERS_GRPC_DISABLE_AUTO_VERIFICATION
   {
diff --git a/grpc/tests/message_builder_test.cpp b/grpc/tests/message_builder_test.cpp
index 25f04da..d5ed8d9 100644
--- a/grpc/tests/message_builder_test.cpp
+++ b/grpc/tests/message_builder_test.cpp
@@ -14,19 +14,37 @@
   return (monster->name()->str() == expected_name) && (monster->color() == color);
 }
 
-template <>
-struct BuilderReuseTests<flatbuffers::grpc::MessageBuilder> {
+void builder_move_assign_after_releaseraw_test(flatbuffers::grpc::MessageBuilder dst) {
+  auto root_offset1 = populate1(dst);
+  dst.Finish(root_offset1);
+  size_t size, offset;
+  grpc_slice slice;
+  dst.ReleaseRaw(size, offset, slice);
+  flatbuffers::FlatBufferBuilder src;
+  auto root_offset2 = populate2(src);
+  src.Finish(root_offset2);
+  auto src_size = src.GetSize();
+  // Move into a released builder.
+  dst = std::move(src);
+  TEST_EQ(dst.GetSize(), src_size);
+  TEST_ASSERT(release_n_verify(dst, m2_name, m2_color));
+  TEST_EQ(src.GetSize(), 0);
+  grpc_slice_unref(slice);
+}
+
+template <class SrcBuilder>
+struct BuilderReuseTests<flatbuffers::grpc::MessageBuilder, SrcBuilder> {
   static void builder_reusable_after_release_message_test(TestSelector selector) {
     if (!selector.count(REUSABLE_AFTER_RELEASE_MESSAGE)) {
       return;
     }
 
-    flatbuffers::grpc::MessageBuilder b1;
+    flatbuffers::grpc::MessageBuilder mb;
     std::vector<flatbuffers::grpc::Message<Monster>> buffers;
     for (int i = 0; i < 5; ++i) {
-      auto root_offset1 = populate1(b1);
-      b1.Finish(root_offset1);
-      buffers.push_back(b1.ReleaseMessage<Monster>());
+      auto root_offset1 = populate1(mb);
+      mb.Finish(root_offset1);
+      buffers.push_back(mb.ReleaseMessage<Monster>());
       TEST_ASSERT_FUNC(verify(buffers[i], m1_name, m1_color));
     }
   }
@@ -36,14 +54,15 @@
       return;
     }
 
-    // FIXME: Populate-Release loop fails assert(GRPC_SLICE_IS_EMPTY(slice_)).
+    // FIXME: Populate-Release loop fails assert(GRPC_SLICE_IS_EMPTY(slice_)) in SliceAllocator::allocate
+    // in the second iteration.
 
-    flatbuffers::grpc::MessageBuilder b1;
+    flatbuffers::grpc::MessageBuilder mb;
     std::vector<flatbuffers::DetachedBuffer> buffers;
-    for (int i = 0; i < 5; ++i) {
-      auto root_offset1 = populate1(b1);
-      b1.Finish(root_offset1);
-      buffers.push_back(b1.Release());
+    for (int i = 0; i < 2; ++i) {
+      auto root_offset1 = populate1(mb);
+      mb.Finish(root_offset1);
+      buffers.push_back(mb.Release());
       TEST_ASSERT_FUNC(verify(buffers[i], m1_name, m1_color));
     }
   }
@@ -53,13 +72,13 @@
       return;
     }
 
-    flatbuffers::grpc::MessageBuilder b1;
+    flatbuffers::grpc::MessageBuilder mb;
     for (int i = 0; i < 5; ++i) {
-      auto root_offset1 = populate1(b1);
-      b1.Finish(root_offset1);
+      auto root_offset1 = populate1(mb);
+      mb.Finish(root_offset1);
       size_t size, offset;
       grpc_slice slice;
-      const uint8_t *buf = b1.ReleaseRaw(size, offset, slice);
+      const uint8_t *buf = mb.ReleaseRaw(size, offset, slice);
       TEST_ASSERT_FUNC(verify(buf, offset, m1_name, m1_color));
       grpc_slice_unref(slice);
     }
@@ -70,22 +89,23 @@
       return;
     }
 
-    // FIXME: Release-move_assign loop fails assert(p == GRPC_SLICE_START_PTR(slice_)).
+    // FIXME: Release-move_assign loop fails assert(p == GRPC_SLICE_START_PTR(slice_))
+    // in DetachedBuffer destructor after all the iterations
 
-    flatbuffers::grpc::MessageBuilder b1;
+    flatbuffers::grpc::MessageBuilder dst;
     std::vector<flatbuffers::DetachedBuffer> buffers;
 
-    for (int i = 0; i < 1; ++i) {
-      auto root_offset1 = populate1(b1);
-      b1.Finish(root_offset1);
-      buffers.push_back(b1.Release());
+    for (int i = 0; i < 2; ++i) {
+      auto root_offset1 = populate1(dst);
+      dst.Finish(root_offset1);
+      buffers.push_back(dst.Release());
       TEST_ASSERT_FUNC(verify(buffers[i], m1_name, m1_color));
 
-      // bring b1 back to life.
-      flatbuffers::grpc::MessageBuilder b2;
-      b1 = std::move(b2);
-      TEST_EQ_FUNC(b1.GetSize(), 0);
-      TEST_EQ_FUNC(b2.GetSize(), 0);
+      // bring dst back to life.
+      SrcBuilder src;
+      dst = std::move(src);
+      TEST_EQ_FUNC(dst.GetSize(), 0);
+      TEST_EQ_FUNC(src.GetSize(), 0);
     }
   }
 
@@ -94,20 +114,20 @@
       return;
     }
 
-    flatbuffers::grpc::MessageBuilder b1;
+    flatbuffers::grpc::MessageBuilder dst;
     std::vector<flatbuffers::grpc::Message<Monster>> buffers;
 
     for (int i = 0; i < 5; ++i) {
-      auto root_offset1 = populate1(b1);
-      b1.Finish(root_offset1);
-      buffers.push_back(b1.ReleaseMessage<Monster>());
+      auto root_offset1 = populate1(dst);
+      dst.Finish(root_offset1);
+      buffers.push_back(dst.ReleaseMessage<Monster>());
       TEST_ASSERT_FUNC(verify(buffers[i], m1_name, m1_color));
 
-      // bring b1 back to life.
-      flatbuffers::grpc::MessageBuilder b2;
-      b1 = std::move(b2);
-      TEST_EQ_FUNC(b1.GetSize(), 0);
-      TEST_EQ_FUNC(b2.GetSize(), 0);
+      // bring dst back to life.
+      SrcBuilder src;
+      dst = std::move(src);
+      TEST_EQ_FUNC(dst.GetSize(), 0);
+      TEST_EQ_FUNC(src.GetSize(), 0);
     }
   }
 
@@ -116,20 +136,20 @@
       return;
     }
 
-    flatbuffers::grpc::MessageBuilder b1;
+    flatbuffers::grpc::MessageBuilder dst;
     for (int i = 0; i < 5; ++i) {
-      auto root_offset1 = populate1(b1);
-      b1.Finish(root_offset1);
+      auto root_offset1 = populate1(dst);
+      dst.Finish(root_offset1);
       size_t size, offset;
       grpc_slice slice = grpc_empty_slice();
-      const uint8_t *buf = b1.ReleaseRaw(size, offset, slice);
+      const uint8_t *buf = dst.ReleaseRaw(size, offset, slice);
       TEST_ASSERT_FUNC(verify(buf, offset, m1_name, m1_color));
       grpc_slice_unref(slice);
 
-      flatbuffers::grpc::MessageBuilder b2;
-      b1 = std::move(b2); 
-      TEST_EQ_FUNC(b1.GetSize(), 0);
-      TEST_EQ_FUNC(b2.GetSize(), 0);
+      SrcBuilder src;
+      dst = std::move(src);
+      TEST_EQ_FUNC(dst.GetSize(), 0);
+      TEST_EQ_FUNC(src.GetSize(), 0);
     }
   }
 
@@ -153,7 +173,7 @@
     buf[0] = 100;
     buf[size-1] = 200;
     flatbuffers::grpc::SliceAllocator sa2(std::move(sa1));
-    // buf should be deleted after move-construct
+    // buf should not be deleted after move-construct
     TEST_EQ_FUNC(buf[0], 100);
     TEST_EQ_FUNC(buf[size-1], 200);
     // buf is freed here
@@ -170,13 +190,140 @@
   }
 }
 
+/// This function does not populate exactly the first half of the table. But it could.
+void populate_first_half(MyGame::Example::MonsterBuilder &wrapper, flatbuffers::Offset<flatbuffers::String> name_offset) {
+  wrapper.add_name(name_offset);
+  wrapper.add_color(m1_color);
+}
+
+/// This function does not populate exactly the second half of the table. But it could.
+void populate_second_half(MyGame::Example::MonsterBuilder &wrapper) {
+  wrapper.add_hp(77);
+  wrapper.add_mana(88);
+  Vec3 vec3;
+  wrapper.add_pos(&vec3);
+}
+
+/// This function is a hack to update the FlatBufferBuilder reference (fbb_) in the MonsterBuilder object.
+/// This function will break if fbb_ is not the first member in MonsterBuilder. In that case, some offset must be added.
+/// This function is used exclusively for testing correctness of move operations between FlatBufferBuilders.
+/// If MonsterBuilder had a fbb_ pointer, this hack would be unnecessary. That involves a code-generator change though.
+void test_only_hack_update_fbb_reference(MyGame::Example::MonsterBuilder &monsterBuilder,
+                                         flatbuffers::grpc::MessageBuilder &mb) {
+  *reinterpret_cast<flatbuffers::FlatBufferBuilder **>(&monsterBuilder) = &mb;
+}
+
+/// This test validates correctness of move conversion of FlatBufferBuilder to a MessageBuilder DURING
+/// a table construction. Half of the table is constructed using FlatBufferBuilder and the other half
+/// of the table is constructed using a MessageBuilder.
+void builder_move_ctor_conversion_before_finish_half_n_half_table_test() {
+  for (size_t initial_size = 4 ; initial_size <= 2048; initial_size *= 2) {
+    flatbuffers::FlatBufferBuilder fbb(initial_size);
+    auto name_offset = fbb.CreateString(m1_name);
+    MyGame::Example::MonsterBuilder monsterBuilder(fbb);     // starts a table in FlatBufferBuilder
+    populate_first_half(monsterBuilder, name_offset);
+    flatbuffers::grpc::MessageBuilder mb(std::move(fbb));
+    test_only_hack_update_fbb_reference(monsterBuilder, mb); // hack
+    populate_second_half(monsterBuilder);
+    mb.Finish(monsterBuilder.Finish());                      // ends the table in MessageBuilder
+    TEST_ASSERT_FUNC(release_n_verify(mb, m1_name, m1_color));
+    TEST_EQ_FUNC(fbb.GetSize(), 0);
+  }
+}
+
+/// This test populates a COMPLETE inner table before move conversion and later populates more members in the outer table.
+void builder_move_ctor_conversion_before_finish_test() {
+  for (size_t initial_size = 4 ; initial_size <= 2048; initial_size *= 2) {
+    flatbuffers::FlatBufferBuilder fbb(initial_size);
+    auto stat_offset = CreateStat(fbb, fbb.CreateString("SomeId"), 0, 0);
+    flatbuffers::grpc::MessageBuilder mb(std::move(fbb));
+    auto monster_offset = CreateMonster(mb, 0, 150, 100, mb.CreateString(m1_name), 0, m1_color, Any_NONE, 0, 0, 0, 0, 0, 0, stat_offset);
+    mb.Finish(monster_offset);
+    TEST_ASSERT_FUNC(release_n_verify(mb, m1_name, m1_color));
+    TEST_EQ_FUNC(fbb.GetSize(), 0);
+  }
+}
+
+/// This test validates correctness of move conversion of FlatBufferBuilder to a MessageBuilder DURING
+/// a table construction. Half of the table is constructed using FlatBufferBuilder and the other half
+/// of the table is constructed using a MessageBuilder.
+void builder_move_assign_conversion_before_finish_half_n_half_table_test() {
+  flatbuffers::FlatBufferBuilder fbb;
+  flatbuffers::grpc::MessageBuilder mb;
+
+  for (int i = 0;i < 5; ++i) {
+    flatbuffers::FlatBufferBuilder fbb;
+    auto name_offset = fbb.CreateString(m1_name);
+    MyGame::Example::MonsterBuilder monsterBuilder(fbb);     // starts a table in FlatBufferBuilder
+    populate_first_half(monsterBuilder, name_offset);
+    mb = std::move(fbb);
+    test_only_hack_update_fbb_reference(monsterBuilder, mb); // hack
+    populate_second_half(monsterBuilder);
+    mb.Finish(monsterBuilder.Finish());                      // ends the table in MessageBuilder
+    TEST_ASSERT_FUNC(release_n_verify(mb, m1_name, m1_color));
+    TEST_EQ_FUNC(fbb.GetSize(), 0);
+  }
+}
+
+/// This test populates a COMPLETE inner table before move conversion and later populates more members in the outer table.
+void builder_move_assign_conversion_before_finish_test() {
+  flatbuffers::FlatBufferBuilder fbb;
+  flatbuffers::grpc::MessageBuilder mb;
+
+  for (int i = 0;i < 5; ++i) {
+    auto stat_offset = CreateStat(fbb, fbb.CreateString("SomeId"), 0, 0);
+    mb = std::move(fbb);
+    auto monster_offset = CreateMonster(mb, 0, 150, 100, mb.CreateString(m1_name), 0, m1_color, Any_NONE, 0, 0, 0, 0, 0, 0, stat_offset);
+    mb.Finish(monster_offset);
+    TEST_ASSERT_FUNC(release_n_verify(mb, m1_name, m1_color));
+    TEST_EQ_FUNC(fbb.GetSize(), 0);
+  }
+}
+
+/// This test populates data, finishes the buffer, and does move conversion after.
+void builder_move_ctor_conversion_after_finish_test() {
+  flatbuffers::FlatBufferBuilder fbb;
+  fbb.Finish(populate1(fbb));
+  flatbuffers::grpc::MessageBuilder mb(std::move(fbb));
+  TEST_ASSERT_FUNC(release_n_verify(mb, m1_name, m1_color));
+  TEST_EQ_FUNC(fbb.GetSize(), 0);
+}
+
+/// This test populates data, finishes the buffer, and does move conversion after.
+void builder_move_assign_conversion_after_finish_test() {
+  flatbuffers::FlatBufferBuilder fbb;
+  flatbuffers::grpc::MessageBuilder mb;
+
+  for (int i = 0;i < 5; ++i) {
+    fbb.Finish(populate1(fbb));
+    mb = std::move(fbb);
+    TEST_ASSERT_FUNC(release_n_verify(mb, m1_name, m1_color));
+    TEST_EQ_FUNC(fbb.GetSize(), 0);
+  }
+}
+
 void message_builder_tests() {
+  using flatbuffers::grpc::MessageBuilder;
+  using flatbuffers::FlatBufferBuilder;
+
   slice_allocator_tests();
-  BuilderTests<flatbuffers::grpc::MessageBuilder>::all_tests();
+
+#ifndef __APPLE__
+  builder_move_ctor_conversion_before_finish_half_n_half_table_test();
+  builder_move_assign_conversion_before_finish_half_n_half_table_test();
+#endif // __APPLE__
+  builder_move_ctor_conversion_before_finish_test();
+  builder_move_assign_conversion_before_finish_test();
+
+  builder_move_ctor_conversion_after_finish_test();
+  builder_move_assign_conversion_after_finish_test();
+
+  BuilderTests<MessageBuilder, MessageBuilder>::all_tests();
+  BuilderTests<MessageBuilder, FlatBufferBuilder>::all_tests();
 
   BuilderReuseTestSelector tests[6] = {
-    // REUSABLE_AFTER_RELEASE,                 // Assertion failed: (GRPC_SLICE_IS_EMPTY(slice_))
-    // REUSABLE_AFTER_RELEASE_AND_MOVE_ASSIGN, // Assertion failed: (p == GRPC_SLICE_START_PTR(slice_)
+    //REUSABLE_AFTER_RELEASE,                 // Assertion failed: (GRPC_SLICE_IS_EMPTY(slice_))
+    //REUSABLE_AFTER_RELEASE_AND_MOVE_ASSIGN, // Assertion failed: (p == GRPC_SLICE_START_PTR(slice_)
 
     REUSABLE_AFTER_RELEASE_RAW,
     REUSABLE_AFTER_RELEASE_MESSAGE,
@@ -184,5 +331,6 @@
     REUSABLE_AFTER_RELEASE_RAW_AND_MOVE_ASSIGN
   };
 
-  BuilderReuseTests<flatbuffers::grpc::MessageBuilder>::run_tests(TestSelector(tests, tests+6));
+  BuilderReuseTests<MessageBuilder, MessageBuilder>::run_tests(TestSelector(tests, tests+6));
+  BuilderReuseTests<MessageBuilder, FlatBufferBuilder>::run_tests(TestSelector(tests, tests+6));
 }