| /* |
| * |
| * Copyright (c) 2021 Project CHIP Authors |
| * All rights reserved. |
| * |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| /** |
| * @file |
| * This file defines the initiator side of a CHIP Read Interaction. |
| * |
| */ |
| |
| #include <app/AppConfig.h> |
| #include <app/InteractionModelEngine.h> |
| #include <app/InteractionModelHelper.h> |
| #include <app/ReadClient.h> |
| #include <app/StatusResponse.h> |
| #include <assert.h> |
| #include <lib/core/TLVTypes.h> |
| #include <lib/support/FibonacciUtils.h> |
| #include <messaging/ReliableMessageMgr.h> |
| #include <messaging/ReliableMessageProtocolConfig.h> |
| |
| namespace chip { |
| namespace app { |
| |
| using Status = Protocols::InteractionModel::Status; |
| |
| ReadClient::ReadClient(InteractionModelEngine * apImEngine, Messaging::ExchangeManager * apExchangeMgr, Callback & apCallback, |
| InteractionType aInteractionType) : |
| mExchange(*this), |
| mpCallback(apCallback), mOnConnectedCallback(HandleDeviceConnected, this), |
| mOnConnectionFailureCallback(HandleDeviceConnectionFailure, this) |
| { |
| mpExchangeMgr = apExchangeMgr; |
| mInteractionType = aInteractionType; |
| |
| mpImEngine = apImEngine; |
| |
| if (aInteractionType == InteractionType::Subscribe) |
| { |
| mpImEngine->AddReadClient(this); |
| } |
| } |
| |
| void ReadClient::ClearActiveSubscriptionState() |
| { |
| mIsReporting = false; |
| mWaitingForFirstPrimingReport = true; |
| mPendingMoreChunks = false; |
| mMinIntervalFloorSeconds = 0; |
| mMaxInterval = 0; |
| mSubscriptionId = 0; |
| mIsResubscriptionScheduled = false; |
| MoveToState(ClientState::Idle); |
| } |
| |
| void ReadClient::StopResubscription() |
| { |
| CancelLivenessCheckTimer(); |
| CancelResubscribeTimer(); |
| |
| // Only deallocate the paths if they are not already deallocated. |
| if (mReadPrepareParams.mpAttributePathParamsList != nullptr || mReadPrepareParams.mpEventPathParamsList != nullptr || |
| mReadPrepareParams.mpDataVersionFilterList != nullptr) |
| { |
| mpCallback.OnDeallocatePaths(std::move(mReadPrepareParams)); |
| // Make sure we will never try to free those pointers again. |
| mReadPrepareParams.mpAttributePathParamsList = nullptr; |
| mReadPrepareParams.mAttributePathParamsListSize = 0; |
| mReadPrepareParams.mpEventPathParamsList = nullptr; |
| mReadPrepareParams.mEventPathParamsListSize = 0; |
| mReadPrepareParams.mpDataVersionFilterList = nullptr; |
| mReadPrepareParams.mDataVersionFilterListSize = 0; |
| } |
| } |
| |
| ReadClient::~ReadClient() |
| { |
| if (IsSubscriptionType()) |
| { |
| StopResubscription(); |
| |
| // Only remove ourselves from the engine's tracker list if we still continue to have a valid pointer to it. |
| // This won't be the case if the engine shut down before this destructor was called (in which case, mpImEngine |
| // will point to null) |
| // |
| if (mpImEngine) |
| { |
| mpImEngine->RemoveReadClient(this); |
| } |
| } |
| } |
| |
| uint32_t ReadClient::ComputeTimeTillNextSubscription() |
| { |
| uint32_t maxWaitTimeInMsec = 0; |
| uint32_t waitTimeInMsec = 0; |
| uint32_t minWaitTimeInMsec = 0; |
| |
| if (mNumRetries <= CHIP_RESUBSCRIBE_MAX_FIBONACCI_STEP_INDEX) |
| { |
| maxWaitTimeInMsec = GetFibonacciForIndex(mNumRetries) * CHIP_RESUBSCRIBE_WAIT_TIME_MULTIPLIER_MS; |
| } |
| else |
| { |
| maxWaitTimeInMsec = CHIP_RESUBSCRIBE_MAX_RETRY_WAIT_INTERVAL_MS; |
| } |
| |
| if (maxWaitTimeInMsec != 0) |
| { |
| minWaitTimeInMsec = (CHIP_RESUBSCRIBE_MIN_WAIT_TIME_INTERVAL_PERCENT_PER_STEP * maxWaitTimeInMsec) / 100; |
| waitTimeInMsec = minWaitTimeInMsec + (Crypto::GetRandU32() % (maxWaitTimeInMsec - minWaitTimeInMsec)); |
| } |
| |
| return waitTimeInMsec; |
| } |
| |
| CHIP_ERROR ReadClient::ScheduleResubscription(uint32_t aTimeTillNextResubscriptionMs, Optional<SessionHandle> aNewSessionHandle, |
| bool aReestablishCASE) |
| { |
| VerifyOrReturnError(IsIdle(), CHIP_ERROR_INCORRECT_STATE); |
| |
| // |
| // If we're establishing CASE, make sure we are not provided a new SessionHandle as well. |
| // |
| VerifyOrReturnError(!aReestablishCASE || !aNewSessionHandle.HasValue(), CHIP_ERROR_INVALID_ARGUMENT); |
| |
| if (aNewSessionHandle.HasValue()) |
| { |
| mReadPrepareParams.mSessionHolder.Grab(aNewSessionHandle.Value()); |
| } |
| |
| mForceCaseOnNextResub = aReestablishCASE; |
| if (mForceCaseOnNextResub && mReadPrepareParams.mSessionHolder) |
| { |
| // Mark our existing session defunct, so that we will try to |
| // re-establish it when the timer fires (unless something re-establishes |
| // before then). |
| mReadPrepareParams.mSessionHolder->AsSecureSession()->MarkAsDefunct(); |
| } |
| |
| ReturnErrorOnFailure( |
| InteractionModelEngine::GetInstance()->GetExchangeManager()->GetSessionManager()->SystemLayer()->StartTimer( |
| System::Clock::Milliseconds32(aTimeTillNextResubscriptionMs), OnResubscribeTimerCallback, this)); |
| mIsResubscriptionScheduled = true; |
| |
| return CHIP_NO_ERROR; |
| } |
| |
| void ReadClient::Close(CHIP_ERROR aError, bool allowResubscription) |
| { |
| if (IsReadType()) |
| { |
| if (aError != CHIP_NO_ERROR) |
| { |
| mpCallback.OnError(aError); |
| } |
| } |
| else |
| { |
| ClearActiveSubscriptionState(); |
| if (aError != CHIP_NO_ERROR) |
| { |
| // |
| // We infer that re-subscription was requested by virtue of having a non-zero list of event OR attribute paths present |
| // in mReadPrepareParams. This would only be the case if an application called SendAutoResubscribeRequest which |
| // populates mReadPrepareParams with the values provided by the application. |
| // |
| if (allowResubscription && |
| (mReadPrepareParams.mEventPathParamsListSize != 0 || mReadPrepareParams.mAttributePathParamsListSize != 0)) |
| { |
| aError = mpCallback.OnResubscriptionNeeded(this, aError); |
| if (aError == CHIP_NO_ERROR) |
| { |
| return; |
| } |
| } |
| |
| // |
| // Either something bad happened when requesting resubscription or the application has decided to not |
| // continue by returning an error. Let's convey the error back up to the application |
| // and shut everything down. |
| // |
| mpCallback.OnError(aError); |
| } |
| |
| StopResubscription(); |
| } |
| |
| mExchange.Release(); |
| |
| mpCallback.OnDone(this); |
| } |
| |
| const char * ReadClient::GetStateStr() const |
| { |
| #if CHIP_DETAIL_LOGGING |
| switch (mState) |
| { |
| case ClientState::Idle: |
| return "Idle"; |
| case ClientState::AwaitingInitialReport: |
| return "AwaitingInitialReport"; |
| case ClientState::AwaitingSubscribeResponse: |
| return "AwaitingSubscribeResponse"; |
| case ClientState::SubscriptionActive: |
| return "SubscriptionActive"; |
| } |
| #endif // CHIP_DETAIL_LOGGING |
| return "N/A"; |
| } |
| |
| void ReadClient::MoveToState(const ClientState aTargetState) |
| { |
| mState = aTargetState; |
| ChipLogDetail(DataManagement, "%s ReadClient[%p]: Moving to [%10.10s]", __func__, this, GetStateStr()); |
| } |
| |
| CHIP_ERROR ReadClient::SendRequest(ReadPrepareParams & aReadPrepareParams) |
| { |
| if (mInteractionType == InteractionType::Read) |
| { |
| return SendReadRequest(aReadPrepareParams); |
| } |
| |
| if (mInteractionType == InteractionType::Subscribe) |
| { |
| return SendSubscribeRequest(aReadPrepareParams); |
| } |
| |
| return CHIP_ERROR_INVALID_ARGUMENT; |
| } |
| |
| CHIP_ERROR ReadClient::SendReadRequest(ReadPrepareParams & aReadPrepareParams) |
| { |
| CHIP_ERROR err = CHIP_NO_ERROR; |
| |
| ChipLogDetail(DataManagement, "%s ReadClient[%p]: Sending Read Request", __func__, this); |
| |
| VerifyOrReturnError(ClientState::Idle == mState, err = CHIP_ERROR_INCORRECT_STATE); |
| |
| Span<AttributePathParams> attributePaths(aReadPrepareParams.mpAttributePathParamsList, |
| aReadPrepareParams.mAttributePathParamsListSize); |
| Span<EventPathParams> eventPaths(aReadPrepareParams.mpEventPathParamsList, aReadPrepareParams.mEventPathParamsListSize); |
| Span<DataVersionFilter> dataVersionFilters(aReadPrepareParams.mpDataVersionFilterList, |
| aReadPrepareParams.mDataVersionFilterListSize); |
| |
| System::PacketBufferHandle msgBuf; |
| ReadRequestMessage::Builder request; |
| System::PacketBufferTLVWriter writer; |
| |
| InitWriterWithSpaceReserved(writer, kReservedSizeForTLVEncodingOverhead); |
| ReturnErrorOnFailure(request.Init(&writer)); |
| |
| if (!attributePaths.empty()) |
| { |
| AttributePathIBs::Builder & attributePathListBuilder = request.CreateAttributeRequests(); |
| ReturnErrorOnFailure(err = request.GetError()); |
| ReturnErrorOnFailure(GenerateAttributePaths(attributePathListBuilder, attributePaths)); |
| } |
| |
| if (!eventPaths.empty()) |
| { |
| EventPathIBs::Builder & eventPathListBuilder = request.CreateEventRequests(); |
| ReturnErrorOnFailure(err = request.GetError()); |
| |
| ReturnErrorOnFailure(GenerateEventPaths(eventPathListBuilder, eventPaths)); |
| |
| Optional<EventNumber> eventMin; |
| ReturnErrorOnFailure(GetMinEventNumber(aReadPrepareParams, eventMin)); |
| if (eventMin.HasValue()) |
| { |
| EventFilterIBs::Builder & eventFilters = request.CreateEventFilters(); |
| ReturnErrorOnFailure(err = request.GetError()); |
| ReturnErrorOnFailure(eventFilters.GenerateEventFilter(eventMin.Value())); |
| } |
| } |
| |
| ReturnErrorOnFailure(request.IsFabricFiltered(aReadPrepareParams.mIsFabricFiltered).GetError()); |
| |
| bool encodedDataVersionList = false; |
| TLV::TLVWriter backup; |
| request.Checkpoint(backup); |
| DataVersionFilterIBs::Builder & dataVersionFilterListBuilder = request.CreateDataVersionFilters(); |
| ReturnErrorOnFailure(request.GetError()); |
| if (!attributePaths.empty()) |
| { |
| ReturnErrorOnFailure(GenerateDataVersionFilterList(dataVersionFilterListBuilder, attributePaths, dataVersionFilters, |
| encodedDataVersionList)); |
| } |
| ReturnErrorOnFailure(dataVersionFilterListBuilder.GetWriter()->UnreserveBuffer(kReservedSizeForTLVEncodingOverhead)); |
| if (encodedDataVersionList) |
| { |
| ReturnErrorOnFailure(dataVersionFilterListBuilder.EndOfDataVersionFilterIBs()); |
| } |
| else |
| { |
| request.Rollback(backup); |
| } |
| |
| ReturnErrorOnFailure(request.EndOfReadRequestMessage()); |
| ReturnErrorOnFailure(writer.Finalize(&msgBuf)); |
| |
| VerifyOrReturnError(aReadPrepareParams.mSessionHolder, CHIP_ERROR_MISSING_SECURE_SESSION); |
| |
| auto exchange = mpExchangeMgr->NewContext(aReadPrepareParams.mSessionHolder.Get().Value(), this); |
| VerifyOrReturnError(exchange != nullptr, err = CHIP_ERROR_NO_MEMORY); |
| |
| mExchange.Grab(exchange); |
| |
| if (aReadPrepareParams.mTimeout == System::Clock::kZero) |
| { |
| mExchange->UseSuggestedResponseTimeout(app::kExpectedIMProcessingTime); |
| } |
| else |
| { |
| mExchange->SetResponseTimeout(aReadPrepareParams.mTimeout); |
| } |
| |
| ReturnErrorOnFailure(mExchange->SendMessage(Protocols::InteractionModel::MsgType::ReadRequest, std::move(msgBuf), |
| Messaging::SendFlags(Messaging::SendMessageFlags::kExpectResponse))); |
| |
| mPeer = aReadPrepareParams.mSessionHolder->AsSecureSession()->GetPeer(); |
| MoveToState(ClientState::AwaitingInitialReport); |
| |
| return CHIP_NO_ERROR; |
| } |
| |
| CHIP_ERROR ReadClient::GenerateEventPaths(EventPathIBs::Builder & aEventPathsBuilder, const Span<EventPathParams> & aEventPaths) |
| { |
| for (auto & event : aEventPaths) |
| { |
| VerifyOrReturnError(event.IsValidEventPath(), CHIP_ERROR_IM_MALFORMED_EVENT_PATH_IB); |
| EventPathIB::Builder & path = aEventPathsBuilder.CreatePath(); |
| ReturnErrorOnFailure(aEventPathsBuilder.GetError()); |
| ReturnErrorOnFailure(path.Encode(event)); |
| } |
| |
| return aEventPathsBuilder.EndOfEventPaths(); |
| } |
| |
| CHIP_ERROR ReadClient::GenerateAttributePaths(AttributePathIBs::Builder & aAttributePathIBsBuilder, |
| const Span<AttributePathParams> & aAttributePaths) |
| { |
| for (auto & attribute : aAttributePaths) |
| { |
| VerifyOrReturnError(attribute.IsValidAttributePath(), CHIP_ERROR_IM_MALFORMED_ATTRIBUTE_PATH_IB); |
| AttributePathIB::Builder & path = aAttributePathIBsBuilder.CreatePath(); |
| ReturnErrorOnFailure(aAttributePathIBsBuilder.GetError()); |
| ReturnErrorOnFailure(path.Encode(attribute)); |
| } |
| |
| return aAttributePathIBsBuilder.EndOfAttributePathIBs(); |
| } |
| |
| CHIP_ERROR ReadClient::BuildDataVersionFilterList(DataVersionFilterIBs::Builder & aDataVersionFilterIBsBuilder, |
| const Span<AttributePathParams> & aAttributePaths, |
| const Span<DataVersionFilter> & aDataVersionFilters, |
| bool & aEncodedDataVersionList) |
| { |
| for (auto & filter : aDataVersionFilters) |
| { |
| VerifyOrReturnError(filter.IsValidDataVersionFilter(), CHIP_ERROR_INVALID_ARGUMENT); |
| |
| // If data version filter is for some cluster none of whose attributes are included in our paths, discard this filter. |
| bool intersected = false; |
| for (auto & path : aAttributePaths) |
| { |
| if (path.IncludesAttributesInCluster(filter)) |
| { |
| intersected = true; |
| break; |
| } |
| } |
| |
| if (!intersected) |
| { |
| continue; |
| } |
| |
| DataVersionFilterIB::Builder & filterIB = aDataVersionFilterIBsBuilder.CreateDataVersionFilter(); |
| ReturnErrorOnFailure(aDataVersionFilterIBsBuilder.GetError()); |
| ClusterPathIB::Builder & path = filterIB.CreatePath(); |
| ReturnErrorOnFailure(filterIB.GetError()); |
| ReturnErrorOnFailure(path.Endpoint(filter.mEndpointId).Cluster(filter.mClusterId).EndOfClusterPathIB()); |
| VerifyOrReturnError(filter.mDataVersion.HasValue(), CHIP_ERROR_INVALID_ARGUMENT); |
| ReturnErrorOnFailure(filterIB.DataVersion(filter.mDataVersion.Value()).EndOfDataVersionFilterIB()); |
| aEncodedDataVersionList = true; |
| } |
| return CHIP_NO_ERROR; |
| } |
| |
| CHIP_ERROR ReadClient::GenerateDataVersionFilterList(DataVersionFilterIBs::Builder & aDataVersionFilterIBsBuilder, |
| const Span<AttributePathParams> & aAttributePaths, |
| const Span<DataVersionFilter> & aDataVersionFilters, |
| bool & aEncodedDataVersionList) |
| { |
| if (!aDataVersionFilters.empty()) |
| { |
| ReturnErrorOnFailure(BuildDataVersionFilterList(aDataVersionFilterIBsBuilder, aAttributePaths, aDataVersionFilters, |
| aEncodedDataVersionList)); |
| } |
| else |
| { |
| ReturnErrorOnFailure( |
| mpCallback.OnUpdateDataVersionFilterList(aDataVersionFilterIBsBuilder, aAttributePaths, aEncodedDataVersionList)); |
| } |
| |
| return CHIP_NO_ERROR; |
| } |
| |
| CHIP_ERROR ReadClient::OnMessageReceived(Messaging::ExchangeContext * apExchangeContext, const PayloadHeader & aPayloadHeader, |
| System::PacketBufferHandle && aPayload) |
| { |
| CHIP_ERROR err = CHIP_NO_ERROR; |
| Status status = Status::InvalidAction; |
| VerifyOrExit(!IsIdle(), err = CHIP_ERROR_INCORRECT_STATE); |
| |
| if (aPayloadHeader.HasMessageType(Protocols::InteractionModel::MsgType::ReportData)) |
| { |
| err = ProcessReportData(std::move(aPayload), ReportType::kContinuingTransaction); |
| } |
| else if (aPayloadHeader.HasMessageType(Protocols::InteractionModel::MsgType::SubscribeResponse)) |
| { |
| ChipLogProgress(DataManagement, "SubscribeResponse is received"); |
| VerifyOrExit(apExchangeContext == mExchange.Get(), err = CHIP_ERROR_INCORRECT_STATE); |
| err = ProcessSubscribeResponse(std::move(aPayload)); |
| } |
| else if (aPayloadHeader.HasMessageType(Protocols::InteractionModel::MsgType::StatusResponse)) |
| { |
| VerifyOrExit(apExchangeContext == mExchange.Get(), err = CHIP_ERROR_INCORRECT_STATE); |
| CHIP_ERROR statusError = CHIP_NO_ERROR; |
| SuccessOrExit(err = StatusResponse::ProcessStatusResponse(std::move(aPayload), statusError)); |
| SuccessOrExit(err = statusError); |
| err = CHIP_ERROR_INVALID_MESSAGE_TYPE; |
| } |
| else |
| { |
| err = CHIP_ERROR_INVALID_MESSAGE_TYPE; |
| } |
| |
| exit: |
| if (err != CHIP_NO_ERROR) |
| { |
| if (err == CHIP_ERROR_INVALID_SUBSCRIPTION) |
| { |
| status = Status::InvalidSubscription; |
| } |
| StatusResponse::Send(status, apExchangeContext, false /*aExpectResponse*/); |
| } |
| |
| if ((!IsSubscriptionType() && !mPendingMoreChunks) || err != CHIP_NO_ERROR) |
| { |
| Close(err); |
| } |
| |
| return err; |
| } |
| |
| void ReadClient::OnUnsolicitedReportData(Messaging::ExchangeContext * apExchangeContext, System::PacketBufferHandle && aPayload) |
| { |
| Status status = Status::Success; |
| mExchange.Grab(apExchangeContext); |
| |
| // |
| // Let's update the session we're tracking in our SessionHolder to that associated with the message that was just received. |
| // This CAN be different from the one we were tracking before, since the server is permitted to send exchanges on any valid |
| // session to us, of which there could be multiple. |
| // |
| // Since receipt of a message is proof of a working session on the peer, it's always best to update to that if possible |
| // to maximize our chances of success later. |
| // |
| mReadPrepareParams.mSessionHolder.Grab(mExchange->GetSessionHandle()); |
| |
| CHIP_ERROR err = ProcessReportData(std::move(aPayload), ReportType::kUnsolicited); |
| if (err != CHIP_NO_ERROR) |
| { |
| if (err == CHIP_ERROR_INVALID_SUBSCRIPTION) |
| { |
| status = Status::InvalidSubscription; |
| } |
| else |
| { |
| status = Status::InvalidAction; |
| } |
| |
| StatusResponse::Send(status, mExchange.Get(), false /*aExpectResponse*/); |
| Close(err); |
| } |
| } |
| |
| CHIP_ERROR ReadClient::ProcessReportData(System::PacketBufferHandle && aPayload, ReportType aReportType) |
| { |
| CHIP_ERROR err = CHIP_NO_ERROR; |
| ReportDataMessage::Parser report; |
| bool suppressResponse = true; |
| SubscriptionId subscriptionId = 0; |
| EventReportIBs::Parser eventReportIBs; |
| AttributeReportIBs::Parser attributeReportIBs; |
| System::PacketBufferTLVReader reader; |
| reader.Init(std::move(aPayload)); |
| err = report.Init(reader); |
| SuccessOrExit(err); |
| |
| #if CHIP_CONFIG_IM_PRETTY_PRINT |
| if (aReportType != ReportType::kUnsolicited) |
| { |
| report.PrettyPrint(); |
| } |
| #endif |
| |
| err = report.GetSuppressResponse(&suppressResponse); |
| if (CHIP_END_OF_TLV == err) |
| { |
| suppressResponse = false; |
| err = CHIP_NO_ERROR; |
| } |
| SuccessOrExit(err); |
| |
| err = report.GetSubscriptionId(&subscriptionId); |
| if (CHIP_NO_ERROR == err) |
| { |
| VerifyOrExit(IsSubscriptionType(), err = CHIP_ERROR_INVALID_ARGUMENT); |
| if (mWaitingForFirstPrimingReport) |
| { |
| mSubscriptionId = subscriptionId; |
| } |
| else if (!IsMatchingSubscriptionId(subscriptionId)) |
| { |
| err = CHIP_ERROR_INVALID_SUBSCRIPTION; |
| } |
| } |
| else if (CHIP_END_OF_TLV == err) |
| { |
| if (IsSubscriptionType()) |
| { |
| err = CHIP_ERROR_INVALID_ARGUMENT; |
| } |
| else |
| { |
| err = CHIP_NO_ERROR; |
| } |
| } |
| SuccessOrExit(err); |
| |
| err = report.GetMoreChunkedMessages(&mPendingMoreChunks); |
| if (CHIP_END_OF_TLV == err) |
| { |
| mPendingMoreChunks = false; |
| err = CHIP_NO_ERROR; |
| } |
| SuccessOrExit(err); |
| |
| err = report.GetEventReports(&eventReportIBs); |
| if (err == CHIP_END_OF_TLV) |
| { |
| err = CHIP_NO_ERROR; |
| } |
| else if (err == CHIP_NO_ERROR) |
| { |
| chip::TLV::TLVReader EventReportsReader; |
| eventReportIBs.GetReader(&EventReportsReader); |
| err = ProcessEventReportIBs(EventReportsReader); |
| } |
| SuccessOrExit(err); |
| |
| err = report.GetAttributeReportIBs(&attributeReportIBs); |
| if (err == CHIP_END_OF_TLV) |
| { |
| err = CHIP_NO_ERROR; |
| } |
| else if (err == CHIP_NO_ERROR) |
| { |
| TLV::TLVReader attributeReportIBsReader; |
| attributeReportIBs.GetReader(&attributeReportIBsReader); |
| err = ProcessAttributeReportIBs(attributeReportIBsReader); |
| } |
| SuccessOrExit(err); |
| |
| if (mIsReporting && !mPendingMoreChunks) |
| { |
| mpCallback.OnReportEnd(); |
| mIsReporting = false; |
| } |
| |
| SuccessOrExit(err = report.ExitContainer()); |
| |
| exit: |
| if (IsSubscriptionType()) |
| { |
| if (IsAwaitingInitialReport()) |
| { |
| MoveToState(ClientState::AwaitingSubscribeResponse); |
| } |
| else if (IsSubscriptionActive() && err == CHIP_NO_ERROR) |
| { |
| // |
| // Only refresh the liveness check timer if we've successfully established |
| // a subscription and have a valid value for mMaxInterval which the function |
| // relies on. |
| // |
| err = RefreshLivenessCheckTimer(); |
| } |
| } |
| |
| if (!suppressResponse && err == CHIP_NO_ERROR) |
| { |
| bool noResponseExpected = IsSubscriptionActive() && !mPendingMoreChunks; |
| err = StatusResponse::Send(Status::Success, mExchange.Get(), !noResponseExpected); |
| } |
| |
| mWaitingForFirstPrimingReport = false; |
| return err; |
| } |
| |
| void ReadClient::OnResponseTimeout(Messaging::ExchangeContext * apExchangeContext) |
| { |
| ChipLogError(DataManagement, "Time out! failed to receive report data from Exchange: " ChipLogFormatExchange, |
| ChipLogValueExchange(apExchangeContext)); |
| Close(CHIP_ERROR_TIMEOUT); |
| } |
| |
| CHIP_ERROR ReadClient::ProcessAttributePath(AttributePathIB::Parser & aAttributePathParser, |
| ConcreteDataAttributePath & aAttributePath) |
| { |
| CHIP_ERROR err = CHIP_NO_ERROR; |
| // The ReportData must contain a concrete attribute path |
| err = aAttributePathParser.GetConcreteAttributePath(aAttributePath); |
| VerifyOrReturnError(err == CHIP_NO_ERROR, CHIP_ERROR_IM_MALFORMED_ATTRIBUTE_PATH_IB); |
| return CHIP_NO_ERROR; |
| } |
| |
| void ReadClient::NoteReportingData() |
| { |
| if (!mIsReporting) |
| { |
| mpCallback.OnReportBegin(); |
| mIsReporting = true; |
| } |
| } |
| |
| CHIP_ERROR ReadClient::ProcessAttributeReportIBs(TLV::TLVReader & aAttributeReportIBsReader) |
| { |
| CHIP_ERROR err = CHIP_NO_ERROR; |
| while (CHIP_NO_ERROR == (err = aAttributeReportIBsReader.Next())) |
| { |
| TLV::TLVReader dataReader; |
| AttributeReportIB::Parser report; |
| AttributeDataIB::Parser data; |
| AttributeStatusIB::Parser status; |
| AttributePathIB::Parser path; |
| ConcreteDataAttributePath attributePath; |
| StatusIB statusIB; |
| |
| TLV::TLVReader reader = aAttributeReportIBsReader; |
| ReturnErrorOnFailure(report.Init(reader)); |
| |
| err = report.GetAttributeStatus(&status); |
| if (CHIP_NO_ERROR == err) |
| { |
| StatusIB::Parser errorStatus; |
| ReturnErrorOnFailure(status.GetPath(&path)); |
| ReturnErrorOnFailure(ProcessAttributePath(path, attributePath)); |
| ReturnErrorOnFailure(status.GetErrorStatus(&errorStatus)); |
| ReturnErrorOnFailure(errorStatus.DecodeStatusIB(statusIB)); |
| NoteReportingData(); |
| mpCallback.OnAttributeData(attributePath, nullptr, statusIB); |
| } |
| else if (CHIP_END_OF_TLV == err) |
| { |
| ReturnErrorOnFailure(report.GetAttributeData(&data)); |
| ReturnErrorOnFailure(data.GetPath(&path)); |
| ReturnErrorOnFailure(ProcessAttributePath(path, attributePath)); |
| DataVersion version = 0; |
| ReturnErrorOnFailure(data.GetDataVersion(&version)); |
| attributePath.mDataVersion.SetValue(version); |
| |
| if (mReadPrepareParams.mpDataVersionFilterList != nullptr) |
| { |
| UpdateDataVersionFilters(attributePath); |
| } |
| |
| ReturnErrorOnFailure(data.GetData(&dataReader)); |
| |
| // The element in an array may be another array -- so we should only set the list operation when we are handling the |
| // whole list. |
| if (!attributePath.IsListOperation() && dataReader.GetType() == TLV::kTLVType_Array) |
| { |
| attributePath.mListOp = ConcreteDataAttributePath::ListOperation::ReplaceAll; |
| } |
| |
| NoteReportingData(); |
| mpCallback.OnAttributeData(attributePath, &dataReader, statusIB); |
| } |
| } |
| |
| if (CHIP_END_OF_TLV == err) |
| { |
| err = CHIP_NO_ERROR; |
| } |
| |
| return err; |
| } |
| |
| CHIP_ERROR ReadClient::ProcessEventReportIBs(TLV::TLVReader & aEventReportIBsReader) |
| { |
| CHIP_ERROR err = CHIP_NO_ERROR; |
| while (CHIP_NO_ERROR == (err = aEventReportIBsReader.Next())) |
| { |
| TLV::TLVReader dataReader; |
| EventReportIB::Parser report; |
| EventDataIB::Parser data; |
| EventHeader header; |
| StatusIB statusIB; // Default value for statusIB is success. |
| |
| TLV::TLVReader reader = aEventReportIBsReader; |
| ReturnErrorOnFailure(report.Init(reader)); |
| |
| err = report.GetEventData(&data); |
| |
| if (err == CHIP_NO_ERROR) |
| { |
| header.mTimestamp = mEventTimestamp; |
| ReturnErrorOnFailure(data.DecodeEventHeader(header)); |
| mEventTimestamp = header.mTimestamp; |
| |
| ReturnErrorOnFailure(data.GetData(&dataReader)); |
| |
| // |
| // Update the event number being tracked in mReadPrepareParams in case |
| // we want to send it in the next SubscribeRequest message to convey |
| // the event number for which we have already received an event. |
| // |
| mReadPrepareParams.mEventNumber.SetValue(header.mEventNumber + 1); |
| |
| NoteReportingData(); |
| mpCallback.OnEventData(header, &dataReader, nullptr); |
| } |
| else if (err == CHIP_END_OF_TLV) |
| { |
| EventStatusIB::Parser status; |
| EventPathIB::Parser pathIB; |
| StatusIB::Parser statusIBParser; |
| ReturnErrorOnFailure(report.GetEventStatus(&status)); |
| ReturnErrorOnFailure(status.GetPath(&pathIB)); |
| ReturnErrorOnFailure(pathIB.GetEventPath(&header.mPath)); |
| ReturnErrorOnFailure(status.GetErrorStatus(&statusIBParser)); |
| ReturnErrorOnFailure(statusIBParser.DecodeStatusIB(statusIB)); |
| |
| NoteReportingData(); |
| mpCallback.OnEventData(header, nullptr, &statusIB); |
| } |
| } |
| |
| if (CHIP_END_OF_TLV == err) |
| { |
| err = CHIP_NO_ERROR; |
| } |
| |
| return err; |
| } |
| |
| void ReadClient::OverrideLivenessTimeout(System::Clock::Timeout aLivenessTimeout) |
| { |
| mLivenessTimeoutOverride = aLivenessTimeout; |
| auto err = RefreshLivenessCheckTimer(); |
| if (err != CHIP_NO_ERROR) |
| { |
| Close(err); |
| } |
| } |
| |
| CHIP_ERROR ReadClient::RefreshLivenessCheckTimer() |
| { |
| CHIP_ERROR err = CHIP_NO_ERROR; |
| |
| VerifyOrReturnError(IsSubscriptionActive(), CHIP_ERROR_INCORRECT_STATE); |
| |
| CancelLivenessCheckTimer(); |
| |
| System::Clock::Timeout timeout; |
| ReturnErrorOnFailure(ComputeLivenessCheckTimerTimeout(&timeout)); |
| |
| // EFR32/MBED/INFINION/K32W's chrono count return long unsigned, but other platform returns unsigned |
| ChipLogProgress( |
| DataManagement, |
| "Refresh LivenessCheckTime for %lu milliseconds with SubscriptionId = 0x%08" PRIx32 " Peer = %02x:" ChipLogFormatX64, |
| static_cast<long unsigned>(timeout.count()), mSubscriptionId, GetFabricIndex(), ChipLogValueX64(GetPeerNodeId())); |
| err = InteractionModelEngine::GetInstance()->GetExchangeManager()->GetSessionManager()->SystemLayer()->StartTimer( |
| timeout, OnLivenessTimeoutCallback, this); |
| |
| return err; |
| } |
| |
| CHIP_ERROR ReadClient::ComputeLivenessCheckTimerTimeout(System::Clock::Timeout * aTimeout) |
| { |
| if (mLivenessTimeoutOverride != System::Clock::kZero) |
| { |
| *aTimeout = mLivenessTimeoutOverride; |
| return CHIP_NO_ERROR; |
| } |
| |
| VerifyOrReturnError(mReadPrepareParams.mSessionHolder, CHIP_ERROR_INCORRECT_STATE); |
| |
| // |
| // To calculate the duration we're willing to wait for a report to come to us, we take into account the maximum interval of |
| // the subscription AND the time it takes for the report to make it to us in the worst case. This latter bit involves |
| // computing the Ack timeout from the publisher for the ReportData message being sent to us using our IDLE interval as the |
| // basis for that computation. |
| // |
| // Make sure to use the retransmission computation that includes backoff. For purposes of that computation, treat us as |
| // active now (since we are right now sending/receiving messages), and use the default "how long are we guaranteed to stay |
| // active" threshold for now. |
| // |
| // TODO: We need to find a good home for this logic that will correctly compute this based on transport. For now, this will |
| // suffice since we don't use TCP as a transport currently and subscriptions over BLE aren't really a thing. |
| // |
| const auto & ourMrpConfig = GetDefaultMRPConfig(); |
| auto publisherTransmissionTimeout = |
| GetRetransmissionTimeout(ourMrpConfig.mActiveRetransTimeout, ourMrpConfig.mIdleRetransTimeout, |
| System::SystemClock().GetMonotonicTimestamp(), ourMrpConfig.mActiveThresholdTime); |
| *aTimeout = System::Clock::Seconds16(mMaxInterval) + publisherTransmissionTimeout; |
| return CHIP_NO_ERROR; |
| } |
| |
| void ReadClient::CancelLivenessCheckTimer() |
| { |
| InteractionModelEngine::GetInstance()->GetExchangeManager()->GetSessionManager()->SystemLayer()->CancelTimer( |
| OnLivenessTimeoutCallback, this); |
| } |
| |
| void ReadClient::CancelResubscribeTimer() |
| { |
| InteractionModelEngine::GetInstance()->GetExchangeManager()->GetSessionManager()->SystemLayer()->CancelTimer( |
| OnResubscribeTimerCallback, this); |
| mIsResubscriptionScheduled = false; |
| } |
| |
| void ReadClient::OnLivenessTimeoutCallback(System::Layer * apSystemLayer, void * apAppState) |
| { |
| ReadClient * const _this = reinterpret_cast<ReadClient *>(apAppState); |
| |
| // |
| // Might as well try to see if this instance exists in the tracked list in the IM. |
| // This might blow-up if either the client has since been free'ed (use-after-free), or if the engine has since |
| // been shutdown at which point the client wouldn't exist in the active read client list. |
| // |
| VerifyOrDie(_this->mpImEngine->InActiveReadClientList(_this)); |
| |
| ChipLogError(DataManagement, |
| "Subscription Liveness timeout with SubscriptionID = 0x%08" PRIx32 ", Peer = %02x:" ChipLogFormatX64, |
| _this->mSubscriptionId, _this->GetFabricIndex(), ChipLogValueX64(_this->GetPeerNodeId())); |
| |
| // We didn't get a message from the server on time; it's possible that it no |
| // longer has a useful CASE session to us. Mark defunct all sessions that |
| // have not seen peer activity in at least as long as our session. |
| const auto & holder = _this->mReadPrepareParams.mSessionHolder; |
| if (holder) |
| { |
| System::Clock::Timestamp lastPeerActivity = holder->AsSecureSession()->GetLastPeerActivityTime(); |
| _this->mpImEngine->GetExchangeManager()->GetSessionManager()->ForEachMatchingSession( |
| _this->mPeer, [&lastPeerActivity](auto * session) { |
| if (!session->IsCASESession()) |
| { |
| return; |
| } |
| |
| if (session->GetLastPeerActivityTime() > lastPeerActivity) |
| { |
| return; |
| } |
| |
| session->MarkAsDefunct(); |
| }); |
| } |
| |
| // TODO: add a more specific error here for liveness timeout failure to distinguish between other classes of timeouts (i.e |
| // response timeouts). |
| _this->Close(CHIP_ERROR_TIMEOUT); |
| } |
| |
| CHIP_ERROR ReadClient::ProcessSubscribeResponse(System::PacketBufferHandle && aPayload) |
| { |
| System::PacketBufferTLVReader reader; |
| reader.Init(std::move(aPayload)); |
| |
| SubscribeResponseMessage::Parser subscribeResponse; |
| ReturnErrorOnFailure(subscribeResponse.Init(reader)); |
| |
| #if CHIP_CONFIG_IM_PRETTY_PRINT |
| subscribeResponse.PrettyPrint(); |
| #endif |
| |
| SubscriptionId subscriptionId = 0; |
| VerifyOrReturnError(subscribeResponse.GetSubscriptionId(&subscriptionId) == CHIP_NO_ERROR, CHIP_ERROR_INVALID_ARGUMENT); |
| VerifyOrReturnError(IsMatchingSubscriptionId(subscriptionId), CHIP_ERROR_INVALID_SUBSCRIPTION); |
| ReturnErrorOnFailure(subscribeResponse.GetMaxInterval(&mMaxInterval)); |
| |
| ChipLogProgress(DataManagement, |
| "Subscription established with SubscriptionID = 0x%08" PRIx32 " MinInterval = %u" |
| "s MaxInterval = %us Peer = %02x:" ChipLogFormatX64, |
| mSubscriptionId, mMinIntervalFloorSeconds, mMaxInterval, GetFabricIndex(), ChipLogValueX64(GetPeerNodeId())); |
| |
| ReturnErrorOnFailure(subscribeResponse.ExitContainer()); |
| |
| MoveToState(ClientState::SubscriptionActive); |
| |
| mpCallback.OnSubscriptionEstablished(subscriptionId); |
| |
| mNumRetries = 0; |
| |
| ReturnErrorOnFailure(RefreshLivenessCheckTimer()); |
| |
| return CHIP_NO_ERROR; |
| } |
| |
| CHIP_ERROR ReadClient::SendAutoResubscribeRequest(ReadPrepareParams && aReadPrepareParams) |
| { |
| mReadPrepareParams = std::move(aReadPrepareParams); |
| CHIP_ERROR err = SendSubscribeRequest(mReadPrepareParams); |
| if (err != CHIP_NO_ERROR) |
| { |
| StopResubscription(); |
| } |
| return err; |
| } |
| |
| CHIP_ERROR ReadClient::SendAutoResubscribeRequest(const ScopedNodeId & aPublisherId, ReadPrepareParams && aReadPrepareParams) |
| { |
| mPeer = aPublisherId; |
| mReadPrepareParams = std::move(aReadPrepareParams); |
| CHIP_ERROR err = EstablishSessionToPeer(); |
| if (err != CHIP_NO_ERROR) |
| { |
| // Make sure we call our callback's OnDeallocatePaths. |
| StopResubscription(); |
| } |
| return err; |
| } |
| |
| CHIP_ERROR ReadClient::SendSubscribeRequest(const ReadPrepareParams & aReadPrepareParams) |
| { |
| VerifyOrReturnError(aReadPrepareParams.mMinIntervalFloorSeconds <= aReadPrepareParams.mMaxIntervalCeilingSeconds, |
| CHIP_ERROR_INVALID_ARGUMENT); |
| |
| return SendSubscribeRequestImpl(aReadPrepareParams); |
| } |
| |
| CHIP_ERROR ReadClient::SendSubscribeRequestImpl(const ReadPrepareParams & aReadPrepareParams) |
| { |
| VerifyOrReturnError(ClientState::Idle == mState, CHIP_ERROR_INCORRECT_STATE); |
| |
| if (&aReadPrepareParams != &mReadPrepareParams) |
| { |
| mReadPrepareParams.mSessionHolder = aReadPrepareParams.mSessionHolder; |
| } |
| |
| mMinIntervalFloorSeconds = aReadPrepareParams.mMinIntervalFloorSeconds; |
| |
| // Todo: Remove the below, Update span in ReadPrepareParams |
| Span<AttributePathParams> attributePaths(aReadPrepareParams.mpAttributePathParamsList, |
| aReadPrepareParams.mAttributePathParamsListSize); |
| Span<EventPathParams> eventPaths(aReadPrepareParams.mpEventPathParamsList, aReadPrepareParams.mEventPathParamsListSize); |
| Span<DataVersionFilter> dataVersionFilters(aReadPrepareParams.mpDataVersionFilterList, |
| aReadPrepareParams.mDataVersionFilterListSize); |
| |
| System::PacketBufferHandle msgBuf; |
| System::PacketBufferTLVWriter writer; |
| SubscribeRequestMessage::Builder request; |
| InitWriterWithSpaceReserved(writer, kReservedSizeForTLVEncodingOverhead); |
| |
| ReturnErrorOnFailure(request.Init(&writer)); |
| |
| request.KeepSubscriptions(aReadPrepareParams.mKeepSubscriptions) |
| .MinIntervalFloorSeconds(aReadPrepareParams.mMinIntervalFloorSeconds) |
| .MaxIntervalCeilingSeconds(aReadPrepareParams.mMaxIntervalCeilingSeconds); |
| |
| if (!attributePaths.empty()) |
| { |
| AttributePathIBs::Builder & attributePathListBuilder = request.CreateAttributeRequests(); |
| ReturnErrorOnFailure(attributePathListBuilder.GetError()); |
| ReturnErrorOnFailure(GenerateAttributePaths(attributePathListBuilder, attributePaths)); |
| } |
| |
| if (!eventPaths.empty()) |
| { |
| EventPathIBs::Builder & eventPathListBuilder = request.CreateEventRequests(); |
| ReturnErrorOnFailure(eventPathListBuilder.GetError()); |
| ReturnErrorOnFailure(GenerateEventPaths(eventPathListBuilder, eventPaths)); |
| |
| Optional<EventNumber> eventMin; |
| ReturnErrorOnFailure(GetMinEventNumber(aReadPrepareParams, eventMin)); |
| if (eventMin.HasValue()) |
| { |
| EventFilterIBs::Builder & eventFilters = request.CreateEventFilters(); |
| ReturnErrorOnFailure(request.GetError()); |
| ReturnErrorOnFailure(eventFilters.GenerateEventFilter(eventMin.Value())); |
| } |
| } |
| |
| ReturnErrorOnFailure(request.IsFabricFiltered(aReadPrepareParams.mIsFabricFiltered).GetError()); |
| |
| bool encodedDataVersionList = false; |
| TLV::TLVWriter backup; |
| request.Checkpoint(backup); |
| DataVersionFilterIBs::Builder & dataVersionFilterListBuilder = request.CreateDataVersionFilters(); |
| ReturnErrorOnFailure(request.GetError()); |
| if (!attributePaths.empty()) |
| { |
| ReturnErrorOnFailure(GenerateDataVersionFilterList(dataVersionFilterListBuilder, attributePaths, dataVersionFilters, |
| encodedDataVersionList)); |
| } |
| ReturnErrorOnFailure(dataVersionFilterListBuilder.GetWriter()->UnreserveBuffer(kReservedSizeForTLVEncodingOverhead)); |
| if (encodedDataVersionList) |
| { |
| ReturnErrorOnFailure(dataVersionFilterListBuilder.EndOfDataVersionFilterIBs()); |
| } |
| else |
| { |
| request.Rollback(backup); |
| } |
| |
| ReturnErrorOnFailure(request.EndOfSubscribeRequestMessage()); |
| ReturnErrorOnFailure(writer.Finalize(&msgBuf)); |
| |
| VerifyOrReturnError(aReadPrepareParams.mSessionHolder, CHIP_ERROR_MISSING_SECURE_SESSION); |
| |
| auto exchange = mpExchangeMgr->NewContext(aReadPrepareParams.mSessionHolder.Get().Value(), this); |
| if (exchange == nullptr) |
| { |
| if (aReadPrepareParams.mSessionHolder->AsSecureSession()->IsActiveSession()) |
| { |
| return CHIP_ERROR_NO_MEMORY; |
| } |
| |
| // Trying to subscribe with a defunct session somehow. |
| return CHIP_ERROR_INCORRECT_STATE; |
| } |
| |
| mExchange.Grab(exchange); |
| |
| if (aReadPrepareParams.mTimeout == System::Clock::kZero) |
| { |
| mExchange->UseSuggestedResponseTimeout(app::kExpectedIMProcessingTime); |
| } |
| else |
| { |
| mExchange->SetResponseTimeout(aReadPrepareParams.mTimeout); |
| } |
| |
| ReturnErrorOnFailure(mExchange->SendMessage(Protocols::InteractionModel::MsgType::SubscribeRequest, std::move(msgBuf), |
| Messaging::SendFlags(Messaging::SendMessageFlags::kExpectResponse))); |
| |
| mPeer = aReadPrepareParams.mSessionHolder->AsSecureSession()->GetPeer(); |
| MoveToState(ClientState::AwaitingInitialReport); |
| |
| return CHIP_NO_ERROR; |
| } |
| |
| CHIP_ERROR ReadClient::DefaultResubscribePolicy(CHIP_ERROR aTerminationCause) |
| { |
| VerifyOrReturnError(IsIdle(), CHIP_ERROR_INCORRECT_STATE); |
| |
| auto timeTillNextResubscription = ComputeTimeTillNextSubscription(); |
| ChipLogProgress(DataManagement, |
| "Will try to resubscribe to %02x:" ChipLogFormatX64 " at retry index %" PRIu32 " after %" PRIu32 |
| "ms due to error %" CHIP_ERROR_FORMAT, |
| GetFabricIndex(), ChipLogValueX64(GetPeerNodeId()), mNumRetries, timeTillNextResubscription, |
| aTerminationCause.Format()); |
| ReturnErrorOnFailure(ScheduleResubscription(timeTillNextResubscription, NullOptional, aTerminationCause == CHIP_ERROR_TIMEOUT)); |
| return CHIP_NO_ERROR; |
| } |
| |
| void ReadClient::HandleDeviceConnected(void * context, Messaging::ExchangeManager & exchangeMgr, |
| const SessionHandle & sessionHandle) |
| { |
| ReadClient * const _this = static_cast<ReadClient *>(context); |
| VerifyOrDie(_this != nullptr); |
| |
| ChipLogProgress(DataManagement, "HandleDeviceConnected"); |
| _this->mReadPrepareParams.mSessionHolder.Grab(sessionHandle); |
| _this->mpExchangeMgr = &exchangeMgr; |
| |
| _this->mpCallback.OnCASESessionEstablished(sessionHandle, _this->mReadPrepareParams); |
| |
| auto err = _this->SendSubscribeRequest(_this->mReadPrepareParams); |
| if (err != CHIP_NO_ERROR) |
| { |
| _this->Close(err); |
| } |
| } |
| |
| void ReadClient::HandleDeviceConnectionFailure(void * context, const ScopedNodeId & peerId, CHIP_ERROR err) |
| { |
| ReadClient * const _this = static_cast<ReadClient *>(context); |
| VerifyOrDie(_this != nullptr); |
| |
| ChipLogError(DataManagement, "Failed to establish CASE for re-subscription with error '%" CHIP_ERROR_FORMAT "'", err.Format()); |
| |
| _this->Close(err); |
| } |
| |
| void ReadClient::OnResubscribeTimerCallback(System::Layer * /* If this starts being used, fix callers that pass nullptr */, |
| void * apAppState) |
| { |
| ReadClient * const _this = static_cast<ReadClient *>(apAppState); |
| VerifyOrDie(_this != nullptr); |
| |
| _this->mIsResubscriptionScheduled = false; |
| |
| CHIP_ERROR err; |
| |
| ChipLogProgress(DataManagement, "OnResubscribeTimerCallback: ForceCASE = %d", _this->mForceCaseOnNextResub); |
| _this->mNumRetries++; |
| |
| bool allowResubscribeOnError = true; |
| if (!_this->mReadPrepareParams.mSessionHolder || |
| !_this->mReadPrepareParams.mSessionHolder->AsSecureSession()->IsActiveSession()) |
| { |
| // We don't have an active CASE session. We need to go ahead and set |
| // one up, if we can. |
| if (_this->EstablishSessionToPeer() == CHIP_NO_ERROR) |
| { |
| return; |
| } |
| |
| if (_this->mForceCaseOnNextResub) |
| { |
| // Caller asked us to force CASE but we have no way to do CASE. |
| // Just stop trying. |
| allowResubscribeOnError = false; |
| } |
| |
| // No way to send our subscribe request. |
| err = CHIP_ERROR_INCORRECT_STATE; |
| ExitNow(); |
| } |
| |
| err = _this->SendSubscribeRequest(_this->mReadPrepareParams); |
| |
| exit: |
| if (err != CHIP_NO_ERROR) |
| { |
| // |
| // Call Close (which should trigger re-subscription again) EXCEPT if we got here because we didn't have a valid |
| // CASESessionManager pointer when mForceCaseOnNextResub was true. |
| // |
| // In that case, don't permit re-subscription to occur. |
| // |
| _this->Close(err, allowResubscribeOnError); |
| } |
| } |
| |
| void ReadClient::UpdateDataVersionFilters(const ConcreteDataAttributePath & aPath) |
| { |
| for (size_t index = 0; index < mReadPrepareParams.mDataVersionFilterListSize; index++) |
| { |
| if (mReadPrepareParams.mpDataVersionFilterList[index].mEndpointId == aPath.mEndpointId && |
| mReadPrepareParams.mpDataVersionFilterList[index].mClusterId == aPath.mClusterId) |
| { |
| // Now we know the current version for this cluster is aPath.mDataVersion. |
| mReadPrepareParams.mpDataVersionFilterList[index].mDataVersion = aPath.mDataVersion; |
| } |
| } |
| } |
| |
| CHIP_ERROR ReadClient::GetMinEventNumber(const ReadPrepareParams & aReadPrepareParams, Optional<EventNumber> & aEventMin) |
| { |
| if (aReadPrepareParams.mEventNumber.HasValue()) |
| { |
| aEventMin = aReadPrepareParams.mEventNumber; |
| } |
| else |
| { |
| ReturnErrorOnFailure(mpCallback.GetHighestReceivedEventNumber(aEventMin)); |
| if (aEventMin.HasValue()) |
| { |
| // We want to start with the first event _after_ the last one we received. |
| aEventMin.SetValue(aEventMin.Value() + 1); |
| } |
| } |
| return CHIP_NO_ERROR; |
| } |
| |
| void ReadClient::TriggerResubscribeIfScheduled(const char * reason) |
| { |
| if (!mIsResubscriptionScheduled) |
| { |
| return; |
| } |
| |
| ChipLogDetail(DataManagement, "ReadClient[%p] triggering resubscribe, reason: %s", this, reason); |
| CancelResubscribeTimer(); |
| OnResubscribeTimerCallback(nullptr, this); |
| } |
| |
| Optional<System::Clock::Timeout> ReadClient::GetSubscriptionTimeout() |
| { |
| if (!IsSubscriptionType() || !IsSubscriptionActive()) |
| { |
| return NullOptional; |
| } |
| |
| System::Clock::Timeout timeout; |
| CHIP_ERROR err = ComputeLivenessCheckTimerTimeout(&timeout); |
| if (err != CHIP_NO_ERROR) |
| { |
| return NullOptional; |
| } |
| |
| return MakeOptional(timeout); |
| } |
| |
| CHIP_ERROR ReadClient::EstablishSessionToPeer() |
| { |
| ChipLogProgress(DataManagement, "Trying to establish a CASE session for subscription"); |
| auto * caseSessionManager = InteractionModelEngine::GetInstance()->GetCASESessionManager(); |
| VerifyOrReturnError(caseSessionManager != nullptr, CHIP_ERROR_INCORRECT_STATE); |
| caseSessionManager->FindOrEstablishSession(mPeer, &mOnConnectedCallback, &mOnConnectionFailureCallback); |
| return CHIP_NO_ERROR; |
| } |
| |
| } // namespace app |
| } // namespace chip |