[E2E][JF] Add RPC skeleton for ICAC CSR issuance (#39740)

JFA is the RPC server and JFC si the RPC client. JFC is also the PKI Provider
so JFA has to call methods on JFC: in order to make this possible this commit
is adding RPC Server Streaming functionality on JFA.

Signed-off-by: Doru Gucea <doru-cristian.gucea@nxp.com>
diff --git a/examples/common/pigweed/protos/joint_fabric_service.options b/examples/common/pigweed/protos/joint_fabric_service.options
index 153893b..a9bdf58 100644
--- a/examples/common/pigweed/protos/joint_fabric_service.options
+++ b/examples/common/pigweed/protos/joint_fabric_service.options
@@ -1 +1,2 @@
-OwnershipContext.trustedIcacPublicKeyB max_size:65    // Maximum size of trustedIcacPublicKeyB
\ No newline at end of file
+OwnershipContext.trustedIcacPublicKeyB max_size:65    // Maximum size of trustedIcacPublicKeyB
+ICACCSR.csr max_size:600                              // Maximum size of ICACCSR
diff --git a/examples/common/pigweed/protos/joint_fabric_service.proto b/examples/common/pigweed/protos/joint_fabric_service.proto
index 38afc19..20490ef 100644
--- a/examples/common/pigweed/protos/joint_fabric_service.proto
+++ b/examples/common/pigweed/protos/joint_fabric_service.proto
@@ -24,7 +24,22 @@
   bytes trustedIcacPublicKeyB = 3; // see spec., JCM steps
 }
 
+message ICACCSROptions {
+  uint64 anchor_fabric_id = 1;
+}
+
+message ICACCSR {
+  bytes csr = 1;
+}
+
 service JointFabric {
   /* JFC executes the commissioner role and JFA the administrator steps */
   rpc TransferOwnership(OwnershipContext) returns (pw.protobuf.Empty) {}
+
+  // Server streaming: JFC initiates, JFA is RPC Server-Streaming
+  rpc GetICACCSR(pw.protobuf.Empty) returns (stream ICACCSROptions);
+
+  // JFC replies with the ICACCSR each time JFA requests it
+  // (JFA uses the above ICACCSROptions stream for requests)
+  rpc ReplyWithICACCSR(ICACCSR) returns (pw.protobuf.Empty);
 }
diff --git a/examples/common/pigweed/rpc_services/JointFabric.h b/examples/common/pigweed/rpc_services/JointFabric.h
index 9f9ceed..99bceb1 100644
--- a/examples/common/pigweed/rpc_services/JointFabric.h
+++ b/examples/common/pigweed/rpc_services/JointFabric.h
@@ -1,4 +1,5 @@
 #include "JFAManager.h"
+#include "JFARpc.h"
 #include "joint_fabric_service/joint_fabric_service.rpc.pb.h"
 
 #include <crypto/CHIPCryptoPAL.h>
@@ -9,10 +10,17 @@
 
 namespace joint_fabric_service {
 
-class JointFabric : public pw_rpc::nanopb::JointFabric::Service<JointFabric>
+class JointFabric : public pw_rpc::nanopb::JointFabric::Service<JointFabric>, public JFARpc
 {
 public:
+    /*RPC from jfc-client to jfa-server */
     ::pw::Status TransferOwnership(const ::OwnershipContext & request, ::pw_protobuf_Empty & response);
+    void GetICACCSR(const ::pw_protobuf_Empty & request, ServerWriter<::ICACCSROptions> & writer);
+    ::pw::Status ReplyWithICACCSR(const ::ICACCSR & ICACCSRBytes, ::pw_protobuf_Empty & response);
+
+    /* JFARpc overrides */
+    CHIP_ERROR GetICACCSRForJF(uint64_t anchorFabricId, MutableByteSpan & icacCSR);
+    void CloseStreams();
 
 private:
     struct OwnershipTransferContext
@@ -36,6 +44,8 @@
         JFAMgr().FinalizeCommissioning(data->mNodeId, data->mJCM, data->mTrustedIcacPublicKeyBSerialized);
         Platform::Delete(data);
     }
+
+    ServerWriter<::ICACCSROptions> rpcStreamGetICACCSR;
 };
 
 } // namespace joint_fabric_service
diff --git a/examples/jf-admin-app/linux/BUILD.gn b/examples/jf-admin-app/linux/BUILD.gn
index 9a70fc5..a8351b1 100644
--- a/examples/jf-admin-app/linux/BUILD.gn
+++ b/examples/jf-admin-app/linux/BUILD.gn
@@ -39,6 +39,7 @@
     "JFAManager.cpp",
     "include/CHIPProjectAppConfig.h",
     "include/JFAManager.h",
+    "include/JFARpc.h",
     "main.cpp",
   ]
 
diff --git a/examples/jf-admin-app/linux/JFAManager.cpp b/examples/jf-admin-app/linux/JFAManager.cpp
index b8adf67..dbb2a9f 100644
--- a/examples/jf-admin-app/linux/JFAManager.cpp
+++ b/examples/jf-admin-app/linux/JFAManager.cpp
@@ -59,6 +59,11 @@
     return CHIP_NO_ERROR;
 }
 
+void JFAManager::SetJFARpc(JFARpc & aJFARpc)
+{
+    mJFARpc = &aJFARpc;
+}
+
 void JFAManager::HandleCommissioningCompleteEvent()
 {
     for (const auto & fb : mServer->GetFabricTable())
diff --git a/examples/jf-admin-app/linux/include/JFAManager.h b/examples/jf-admin-app/linux/include/JFAManager.h
index c232190..3f7d84b 100644
--- a/examples/jf-admin-app/linux/include/JFAManager.h
+++ b/examples/jf-admin-app/linux/include/JFAManager.h
@@ -27,6 +27,8 @@
 
 #include <lib/core/CHIPError.h>
 
+#include "JFARpc.h"
+
 namespace chip {
 
 class JFAManager
@@ -38,6 +40,8 @@
     void HandleCommissioningCompleteEvent();
     CHIP_ERROR FinalizeCommissioning(NodeId nodeId, bool isJCM, chip::Crypto::P256PublicKey & trustedIcacPublicKeyB);
 
+    void SetJFARpc(JFARpc & aJFARpc);
+
 private:
     // Various actions to take when OnConnected callback is called
     enum OnConnectedAction
@@ -54,6 +58,7 @@
     Server * mServer                          = nullptr;
     CASESessionManager * mCASESessionManager  = nullptr;
     Messaging::ExchangeManager * mExchangeMgr = nullptr;
+    JFARpc * mJFARpc                          = nullptr;
     SessionHolder mSessionHolder;
     Callback::Callback<OnDeviceConnected> mOnConnectedCallback;
     Callback::Callback<OnDeviceConnectionFailure> mOnConnectionFailureCallback;
diff --git a/examples/jf-admin-app/linux/include/JFARpc.h b/examples/jf-admin-app/linux/include/JFARpc.h
new file mode 100644
index 0000000..d71e7fe
--- /dev/null
+++ b/examples/jf-admin-app/linux/include/JFARpc.h
@@ -0,0 +1,33 @@
+/*
+ *
+ *    Copyright (c) 2025 Project CHIP Authors
+ *    All rights reserved.
+ *
+ *    Licensed under the Apache License, Version 2.0 (the "License");
+ *    you may not use this file except in compliance with the License.
+ *    You may obtain a copy of the License at
+ *
+ *        http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *    Unless required by applicable law or agreed to in writing, software
+ *    distributed under the License is distributed on an "AS IS" BASIS,
+ *    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *    See the License for the specific language governing permissions and
+ *    limitations under the License.
+ */
+
+#pragma once
+
+#include <lib/core/CHIPError.h>
+
+namespace chip {
+
+class JFARpc
+{
+public:
+    virtual ~JFARpc() {}
+    virtual CHIP_ERROR GetICACCSRForJF(uint64_t anchorFabricId, MutableByteSpan & icacCSR) = 0;
+    virtual void CloseStreams()                                                            = 0;
+};
+
+} // namespace chip
diff --git a/examples/jf-admin-app/linux/rpc/RpcServer.cpp b/examples/jf-admin-app/linux/rpc/RpcServer.cpp
index 89644de..a321961 100644
--- a/examples/jf-admin-app/linux/rpc/RpcServer.cpp
+++ b/examples/jf-admin-app/linux/rpc/RpcServer.cpp
@@ -8,10 +8,17 @@
 
 namespace joint_fabric_service {
 
+constexpr uint32_t kRpcTimeoutMs = 1000;
+std::condition_variable responseCv;
+bool responseReceived = false;
+
+uint8_t icacCSRBuf[Crypto::kMIN_CSR_Buffer_Size] = { 0 };
+MutableByteSpan icacCSRSpan{ icacCSRBuf };
+
 ::pw::Status JointFabric::TransferOwnership(const ::OwnershipContext & request, ::pw_protobuf_Empty & response)
 {
-    ChipLogProgress(JointFabric, "Ownership Transfer for NodeId: 0x" ChipLogFormatX64 ", jcm=%d", ChipLogValueX64(request.node_id),
-                    request.jcm);
+    ChipLogProgress(JointFabric, "RPC Ownership Transfer for NodeId: 0x" ChipLogFormatX64 ", jcm=%d",
+                    ChipLogValueX64(request.node_id), request.jcm);
 
     if (request.jcm && (Crypto::kP256_PublicKey_Length != request.trustedIcacPublicKeyB.size))
     {
@@ -32,4 +39,57 @@
     return pw::OkStatus();
 }
 
+void JointFabric::GetICACCSR(const ::pw_protobuf_Empty & request, ServerWriter<::ICACCSROptions> & writer)
+{
+    ChipLogProgress(JointFabric, "GetICACCSR Stream Opened");
+    rpcStreamGetICACCSR = std::move(writer);
+
+    return;
+}
+
+::pw::Status JointFabric::ReplyWithICACCSR(const ::ICACCSR & ICACCSRBytes, ::pw_protobuf_Empty & response)
+{
+    ChipLogProgress(JointFabric, "RPC ReplyWithICACCSR");
+
+    CopySpanToMutableSpan(ByteSpan(ICACCSRBytes.csr.bytes, ICACCSRBytes.csr.size), icacCSRSpan);
+
+    responseReceived = true;
+    responseCv.notify_one();
+
+    return pw::OkStatus();
+}
+
+CHIP_ERROR JointFabric::GetICACCSRForJF(uint64_t anchorFabricId, MutableByteSpan & icacCSR)
+{
+    std::mutex responseMutex;
+    std::unique_lock<std::mutex> lock(responseMutex);
+    ::pw::Status status;
+
+    // JFA requests an ICAC CSR from JFC
+    ICACCSROptions icaccsrOptions{ anchorFabricId };
+    status = rpcStreamGetICACCSR.Write(icaccsrOptions);
+
+    if (pw::OkStatus() != status)
+    {
+        ChipLogError(JointFabric, "Writing to RPC stream GetICACCSR failed");
+        return CHIP_ERROR_SHUT_DOWN;
+    }
+
+    // wait for the ICAC CSR from JFC
+    if (responseCv.wait_for(lock, std::chrono::milliseconds(kRpcTimeoutMs), [] { return responseReceived; }))
+    {
+        CopySpanToMutableSpan(ByteSpan(icacCSRSpan.data(), icacCSRSpan.size()), icacCSR);
+        responseReceived = false;
+
+        return CHIP_NO_ERROR;
+    }
+
+    return CHIP_ERROR_TIMEOUT;
+}
+
+void JointFabric::CloseStreams()
+{
+    rpcStreamGetICACCSR.Finish();
+}
+
 } // namespace joint_fabric_service
diff --git a/examples/jf-control-app/commands/pairing/PairingCommand.cpp b/examples/jf-control-app/commands/pairing/PairingCommand.cpp
index 852d17c..fcae701 100644
--- a/examples/jf-control-app/commands/pairing/PairingCommand.cpp
+++ b/examples/jf-control-app/commands/pairing/PairingCommand.cpp
@@ -507,6 +507,13 @@
 constexpr uint32_t kRpcTimeoutMs     = 1000;
 constexpr uint32_t kDefaultChannelId = 1;
 
+struct ICACCSRContext
+{
+    ICACCSRContext(uint64_t fabricId) : mAnchorFabricId(fabricId) {}
+
+    uint64_t mAnchorFabricId;
+};
+
 ::pw_rpc::nanopb::JointFabric::Client rpcClient(chip::rpc::client::GetDefaultRpcClient(), kDefaultChannelId);
 
 std::mutex responseMutex;
@@ -533,8 +540,7 @@
     }
 }
 
-// Callback function to be called when the RPC response is received
-void OnOwnershipTransferDone(const _pw_protobuf_Empty & response, ::pw::Status status)
+void OnRPCTransferDone(const _pw_protobuf_Empty & response, ::pw::Status status)
 {
     std::lock_guard<std::mutex> lock(responseMutex);
     responseReceived = true;
@@ -543,11 +549,58 @@
 
     if (status.ok())
     {
-        ChipLogProgress(JointFabric, "OnOwnershipTransferDone RPC call succeeded!");
+        ChipLogProgress(JointFabric, "OnRPCTransferDone RPC call succeeded!");
     }
     else
     {
-        ChipLogProgress(JointFabric, "OnOwnershipTransferDone RPC call failed with status: %d\n", status.code());
+        ChipLogProgress(JointFabric, "OnRPCTransferDone RPC call failed with status: %d\n", status.code());
+    }
+}
+
+static void GenerateICACCSRWork(intptr_t arg)
+{
+    ICACCSRContext * data = reinterpret_cast<ICACCSRContext *>(arg);
+    ICACCSR reply;
+
+    // TODO: call the function that creates the ICAC CSR and populate reply
+
+    Platform::Delete(data);
+
+    // RPC call to JFA to reply with the ICAC CSR
+    auto call = rpcClient.ReplyWithICACCSR(reply, OnRPCTransferDone);
+    if (!call.active())
+    {
+        ChipLogError(JointFabric, "RPC, ReplyWithICACCSR Call Error");
+        return;
+    }
+
+    CHIP_ERROR err = WaitForResponse(call);
+    if (err != CHIP_NO_ERROR)
+    {
+        ChipLogError(JointFabric, "GenerateICACCSRWork, WaitForResponse Error");
+    }
+}
+
+void OnGetICACSROnNext(const ICACCSROptions & rpcIcacOptions)
+{
+    ChipLogProgress(JointFabric, "OnGetICACSROnNext, fabricId: %ld", rpcIcacOptions.anchor_fabric_id);
+
+    ICACCSRContext * options = Platform::New<ICACCSRContext>(rpcIcacOptions.anchor_fabric_id);
+    if (options)
+    {
+        DeviceLayer::PlatformMgr().ScheduleWork(GenerateICACCSRWork, reinterpret_cast<intptr_t>(options));
+    }
+}
+
+void OnGetICACSROnDone(::pw::Status status)
+{
+    if (status.ok())
+    {
+        ChipLogProgress(JointFabric, "GetICACCSR RPC Stream successfully closed!");
+    }
+    else
+    {
+        ChipLogProgress(JointFabric, "GetICACCSR RPC Stream closed with error: %d\n", status.code());
     }
 }
 
@@ -562,6 +615,21 @@
         {
             ChipLogProgress(JointFabric, "Anchor Administrator commissioned with sucess");
             mAnchorNodeId = nodeId;
+
+            _pw_protobuf_Empty request;
+
+            ::pw::rpc::NanopbClientReader<::ICACCSROptions> localStream =
+                rpcClient.GetICACCSR(request, OnGetICACSROnNext, OnGetICACSROnDone);
+            if (!localStream.active())
+            {
+                ChipLogError(JointFabric, "RPC: Opening GetICACCSROptions Stream Error");
+                SetCommandExitStatus(CHIP_ERROR_SHUT_DOWN);
+                return;
+            }
+            else
+            {
+                rpcStreamGetICACCSR = std::move(localStream);
+            }
         }
         else
         {
@@ -595,17 +663,20 @@
                 }
             }
 
-            auto call = rpcClient.TransferOwnership(request, OnOwnershipTransferDone);
+            auto call = rpcClient.TransferOwnership(request, OnRPCTransferDone);
             if (!call.active())
             {
                 ChipLogError(JointFabric, "RPC: OwnershipTransfer Call Error");
-                // The RPC call was not sent. This could occur due to, for example, an invalid channel ID. Handle if necessary.
+                SetCommandExitStatus(CHIP_ERROR_SHUT_DOWN);
+                return;
             }
 
             err = WaitForResponse(call);
             if (err != CHIP_NO_ERROR)
             {
                 ChipLogError(JointFabric, "RPC: OwnershipTransfer Timeout Error");
+                SetCommandExitStatus(err);
+                return;
             }
         }
     }
diff --git a/examples/jf-control-app/commands/pairing/PairingCommand.h b/examples/jf-control-app/commands/pairing/PairingCommand.h
index 764a1be..9032b97 100644
--- a/examples/jf-control-app/commands/pairing/PairingCommand.h
+++ b/examples/jf-control-app/commands/pairing/PairingCommand.h
@@ -19,6 +19,7 @@
 #pragma once
 
 #include "../common/CHIPCommand.h"
+#include "joint_fabric_service/joint_fabric_service.rpc.pb.h"
 #include <controller/CommissioningDelegate.h>
 #include <controller/CurrentFabricRemover.h>
 
@@ -316,6 +317,7 @@
     uint8_t mRandomGeneratedICDSymmetricKey[chip::Crypto::kAES_CCM128_Key_Length];
 
     chip::Optional<bool> mExecuteJCM;
+    ::pw::rpc::NanopbClientReader<::ICACCSROptions> rpcStreamGetICACCSR;
 
     // For unpair
     chip::Platform::UniquePtr<chip::Controller::CurrentFabricRemover> mCurrentFabricRemover;
diff --git a/examples/platform/linux/Rpc.cpp b/examples/platform/linux/Rpc.cpp
index 05a2af1..e1fb0ab 100644
--- a/examples/platform/linux/Rpc.cpp
+++ b/examples/platform/linux/Rpc.cpp
@@ -145,6 +145,9 @@
 
 #if defined(PW_RPC_JF_ADMIN_SERVICE) && PW_RPC_JF_ADMIN_SERVICE
     server.RegisterService(joint_fabric_service);
+
+    JFAMgr().SetJFARpc(joint_fabric_service);
+
 #endif // defined(PW_RPC_JF_ADMIN_SERVICE) && PW_RPC_JF_ADMIN_SERVICE
 }