/* | |
* IoT MQTT V2.1.0 | |
* Copyright (C) 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. | |
* | |
* Permission is hereby granted, free of charge, to any person obtaining a copy of | |
* this software and associated documentation files (the "Software"), to deal in | |
* the Software without restriction, including without limitation the rights to | |
* use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of | |
* the Software, and to permit persons to whom the Software is furnished to do so, | |
* subject to the following conditions: | |
* | |
* The above copyright notice and this permission notice shall be included in all | |
* copies or substantial portions of the Software. | |
* | |
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR | |
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS | |
* FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR | |
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER | |
* IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN | |
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. | |
*/ | |
/** | |
* @file iot_mqtt_operation.c | |
* @brief Implements functions that process MQTT operations. | |
*/ | |
/* The config header is always included first. */ | |
#include "iot_config.h" | |
/* Standard includes. */ | |
#include <string.h> | |
/* Error handling include. */ | |
#include "iot_error.h" | |
/* MQTT internal include. */ | |
#include "private/iot_mqtt_internal.h" | |
/* Platform layer includes. */ | |
#include "platform/iot_clock.h" | |
#include "platform/iot_threads.h" | |
/* Atomics include. */ | |
#include "iot_atomic.h" | |
/** | |
* @cond DOXYGEN_IGNORE | |
* Doxygen should ignore this section. | |
* | |
* Declaration of local MQTT serializer override selectors | |
*/ | |
#if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1 | |
_SERIALIZER_OVERRIDE_SELECTOR( IotMqttPublishSetDup_t, | |
_getMqttPublishSetDupFunc, | |
_IotMqtt_PublishSetDup, | |
serialize.publishSetDup ) | |
_SERIALIZER_OVERRIDE_SELECTOR( IotMqttFreePacket_t, | |
_getMqttFreePacketFunc, | |
_IotMqtt_FreePacket, | |
freePacket ) | |
#else /* if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1 */ | |
#define _getMqttFreePacketFunc( pSerializer ) _IotMqtt_FreePacket | |
#define _getMqttPublishSetDupFunc( pSerializer ) _IotMqtt_PublishSetDup | |
#endif /* if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1 */ | |
/** @endcond */ | |
/*-----------------------------------------------------------*/ | |
/** | |
* @brief First parameter to #_mqttOperation_match. | |
*/ | |
typedef struct _operationMatchParam | |
{ | |
IotMqttOperationType_t type; /**< @brief The type of operation to look for. */ | |
const uint16_t * pPacketIdentifier; /**< @brief The packet identifier associated with the operation. | |
* Set to `NULL` to ignore packet identifier. */ | |
} _operationMatchParam_t; | |
/*-----------------------------------------------------------*/ | |
/** | |
* @brief Match an MQTT operation by type and packet identifier. | |
* | |
* @param[in] pOperationLink Pointer to the link member of an #_mqttOperation_t. | |
* @param[in] pMatch Pointer to an #_operationMatchParam_t. | |
* | |
* @return `true` if the operation matches the parameters in `pArgument`; `false` | |
* otherwise. | |
*/ | |
static bool _mqttOperation_match( const IotLink_t * pOperationLink, | |
void * pMatch ); | |
/** | |
* @brief Check if an operation with retry has exceeded its retry limit. | |
* | |
* If a PUBLISH operation is available for retry, this function also sets any | |
* necessary DUP flags. | |
* | |
* @param[in] pOperation The operation to check. | |
* | |
* @return `true` if the operation may be retried; `false` otherwise. | |
*/ | |
static bool _checkRetryLimit( _mqttOperation_t * pOperation ); | |
/** | |
* @brief Schedule the next send of an operation with retry. | |
* | |
* @param[in] pOperation The operation to schedule. | |
* | |
* @return `true` if the reschedule succeeded; `false` otherwise. | |
*/ | |
static bool _scheduleNextRetry( _mqttOperation_t * pOperation ); | |
/*-----------------------------------------------------------*/ | |
static bool _mqttOperation_match( const IotLink_t * pOperationLink, | |
void * pMatch ) | |
{ | |
bool match = false; | |
/* Because this function is called from a container function, the given link | |
* must never be NULL. */ | |
IotMqtt_Assert( pOperationLink != NULL ); | |
_mqttOperation_t * pOperation = IotLink_Container( _mqttOperation_t, | |
pOperationLink, | |
link ); | |
_operationMatchParam_t * pParam = ( _operationMatchParam_t * ) pMatch; | |
/* Check for matching operations. */ | |
if( pParam->type == pOperation->u.operation.type ) | |
{ | |
/* Check for matching packet identifiers. */ | |
if( pParam->pPacketIdentifier == NULL ) | |
{ | |
match = true; | |
} | |
else | |
{ | |
match = ( *( pParam->pPacketIdentifier ) == pOperation->u.operation.packetIdentifier ); | |
} | |
} | |
else | |
{ | |
EMPTY_ELSE_MARKER; | |
} | |
return match; | |
} | |
/*-----------------------------------------------------------*/ | |
static bool _checkRetryLimit( _mqttOperation_t * pOperation ) | |
{ | |
_mqttConnection_t * pMqttConnection = pOperation->pMqttConnection; | |
bool status = true, setDup = false; | |
/* Only PUBLISH may be retried. */ | |
IotMqtt_Assert( pOperation->u.operation.type == IOT_MQTT_PUBLISH_TO_SERVER ); | |
/* Check if the retry limit is exhausted. */ | |
if( pOperation->u.operation.periodic.retry.count > pOperation->u.operation.periodic.retry.limit ) | |
{ | |
/* The retry count may be at most one more than the retry limit, which | |
* accounts for the final check for a PUBACK. */ | |
IotMqtt_Assert( pOperation->u.operation.periodic.retry.count == | |
pOperation->u.operation.periodic.retry.limit + 1 ); | |
IotLogDebug( "(MQTT connection %p, PUBLISH operation %p) No response received after %lu retries.", | |
pMqttConnection, | |
pOperation, | |
pOperation->u.operation.periodic.retry.limit ); | |
status = false; | |
} | |
else | |
{ | |
if( pOperation->u.operation.periodic.retry.count == 1 ) | |
{ | |
/* The DUP flag should always be set on the first retry. */ | |
setDup = true; | |
} | |
else if( pMqttConnection->awsIotMqttMode == true ) | |
{ | |
/* In AWS IoT MQTT mode, the DUP flag (really a change to the packet | |
* identifier) must be reset on every retry. */ | |
setDup = true; | |
} | |
else | |
{ | |
EMPTY_ELSE_MARKER; | |
} | |
if( setDup == true ) | |
{ | |
/* In AWS IoT MQTT mode, the references mutex must be locked to | |
* prevent the packet identifier from being read while it is being | |
* changed. */ | |
if( pMqttConnection->awsIotMqttMode == true ) | |
{ | |
IotMutex_Lock( &( pMqttConnection->referencesMutex ) ); | |
} | |
else | |
{ | |
EMPTY_ELSE_MARKER; | |
} | |
/* Set the DUP flag */ | |
_getMqttPublishSetDupFunc( pMqttConnection->pSerializer )( pOperation->u.operation.pMqttPacket, | |
pOperation->u.operation.pPacketIdentifierHigh, | |
&( pOperation->u.operation.packetIdentifier ) ); | |
if( pMqttConnection->awsIotMqttMode == true ) | |
{ | |
IotMutex_Unlock( &( pMqttConnection->referencesMutex ) ); | |
} | |
else | |
{ | |
EMPTY_ELSE_MARKER; | |
} | |
} | |
else | |
{ | |
EMPTY_ELSE_MARKER; | |
} | |
} | |
return status; | |
} | |
/*-----------------------------------------------------------*/ | |
static bool _scheduleNextRetry( _mqttOperation_t * pOperation ) | |
{ | |
bool firstRetry = false; | |
uint32_t scheduleDelay = 0; | |
IotMqttError_t status = IOT_MQTT_STATUS_PENDING; | |
_mqttConnection_t * pMqttConnection = pOperation->pMqttConnection; | |
/* This function should never be called with retry count greater than | |
* retry limit. */ | |
IotMqtt_Assert( pOperation->u.operation.periodic.retry.count <= | |
pOperation->u.operation.periodic.retry.limit ); | |
/* Increment the retry count. */ | |
( pOperation->u.operation.periodic.retry.count )++; | |
/* Check for a response shortly for the final retry. Otherwise, calculate the | |
* next retry period. */ | |
if( pOperation->u.operation.periodic.retry.count > | |
pOperation->u.operation.periodic.retry.limit ) | |
{ | |
scheduleDelay = IOT_MQTT_RESPONSE_WAIT_MS; | |
IotLogDebug( "(MQTT connection %p, PUBLISH operation %p) Final retry was sent. Will check " | |
"for response in %d ms.", | |
pMqttConnection, | |
pOperation, | |
IOT_MQTT_RESPONSE_WAIT_MS ); | |
} | |
else | |
{ | |
scheduleDelay = pOperation->u.operation.periodic.retry.nextPeriodMs; | |
/* Double the retry period, subject to a ceiling value. */ | |
pOperation->u.operation.periodic.retry.nextPeriodMs *= 2; | |
if( pOperation->u.operation.periodic.retry.nextPeriodMs > IOT_MQTT_RETRY_MS_CEILING ) | |
{ | |
pOperation->u.operation.periodic.retry.nextPeriodMs = IOT_MQTT_RETRY_MS_CEILING; | |
} | |
else | |
{ | |
EMPTY_ELSE_MARKER; | |
} | |
IotLogDebug( "(MQTT connection %p, PUBLISH operation %p) Scheduling retry %lu of %lu in %lu ms.", | |
pMqttConnection, | |
pOperation, | |
( unsigned long ) pOperation->u.operation.periodic.retry.count, | |
( unsigned long ) pOperation->u.operation.periodic.retry.limit, | |
( unsigned long ) scheduleDelay ); | |
/* Check if this is the first retry. */ | |
firstRetry = ( pOperation->u.operation.periodic.retry.count == 1 ); | |
/* On the first retry, the PUBLISH will be moved from the pending processing | |
* list to the pending responses list. Lock the connection references mutex | |
* to manipulate the lists. */ | |
if( firstRetry == true ) | |
{ | |
IotMutex_Lock( &( pMqttConnection->referencesMutex ) ); | |
} | |
else | |
{ | |
EMPTY_ELSE_MARKER; | |
} | |
} | |
/* Reschedule the PUBLISH for another send. */ | |
status = _IotMqtt_ScheduleOperation( pOperation, | |
_IotMqtt_ProcessSend, | |
scheduleDelay ); | |
/* Check for successful reschedule. */ | |
if( status == IOT_MQTT_SUCCESS ) | |
{ | |
/* Move a successfully rescheduled PUBLISH from the pending processing | |
* list to the pending responses list on the first retry. */ | |
if( firstRetry == true ) | |
{ | |
if( IotLink_IsLinked( &( pOperation->link ) ) == true ) | |
{ | |
IotListDouble_Remove( &( pOperation->link ) ); | |
} | |
IotListDouble_InsertHead( &( pMqttConnection->pendingResponse ), | |
&( pOperation->link ) ); | |
} | |
else | |
{ | |
EMPTY_ELSE_MARKER; | |
} | |
} | |
else | |
{ | |
EMPTY_ELSE_MARKER; | |
} | |
/* The references mutex only needs to be unlocked on the first retry, since | |
* only the first retry manipulates the connection lists. */ | |
if( firstRetry == true ) | |
{ | |
IotMutex_Unlock( &( pMqttConnection->referencesMutex ) ); | |
} | |
else | |
{ | |
EMPTY_ELSE_MARKER; | |
} | |
return( status == IOT_MQTT_SUCCESS ); | |
} | |
/*-----------------------------------------------------------*/ | |
IotMqttError_t _IotMqtt_CreateOperation( _mqttConnection_t * pMqttConnection, | |
uint32_t flags, | |
const IotMqttCallbackInfo_t * pCallbackInfo, | |
_mqttOperation_t ** pNewOperation ) | |
{ | |
IOT_FUNCTION_ENTRY( IotMqttError_t, IOT_MQTT_SUCCESS ); | |
bool decrementOnError = false; | |
_mqttOperation_t * pOperation = NULL; | |
bool waitable = ( ( flags & IOT_MQTT_FLAG_WAITABLE ) == IOT_MQTT_FLAG_WAITABLE ); | |
/* If the waitable flag is set, make sure that there's no callback. */ | |
if( waitable == true ) | |
{ | |
if( pCallbackInfo != NULL ) | |
{ | |
IotLogError( "Callback should not be set for a waitable operation." ); | |
IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_BAD_PARAMETER ); | |
} | |
else | |
{ | |
EMPTY_ELSE_MARKER; | |
} | |
} | |
else | |
{ | |
EMPTY_ELSE_MARKER; | |
} | |
IotLogDebug( "(MQTT connection %p) Creating new operation record.", | |
pMqttConnection ); | |
/* Increment the reference count for the MQTT connection when creating a new | |
* operation. */ | |
if( _IotMqtt_IncrementConnectionReferences( pMqttConnection ) == false ) | |
{ | |
IotLogError( "(MQTT connection %p) New operation record cannot be created" | |
" for a closed connection", | |
pMqttConnection ); | |
IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_NETWORK_ERROR ); | |
} | |
else | |
{ | |
/* Reference count will need to be decremented on error. */ | |
decrementOnError = true; | |
} | |
/* Allocate memory for a new operation. */ | |
pOperation = IotMqtt_MallocOperation( sizeof( _mqttOperation_t ) ); | |
if( pOperation == NULL ) | |
{ | |
IotLogError( "(MQTT connection %p) Failed to allocate memory for new " | |
"operation record.", | |
pMqttConnection ); | |
IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_NO_MEMORY ); | |
} | |
else | |
{ | |
/* Clear the operation data. */ | |
( void ) memset( pOperation, 0x00, sizeof( _mqttOperation_t ) ); | |
/* Initialize some members of the new operation. */ | |
pOperation->pMqttConnection = pMqttConnection; | |
pOperation->u.operation.jobReference = 1; | |
pOperation->u.operation.flags = flags; | |
pOperation->u.operation.status = IOT_MQTT_STATUS_PENDING; | |
} | |
/* Check if the waitable flag is set. If it is, create a semaphore to | |
* wait on. */ | |
if( waitable == true ) | |
{ | |
/* Create a semaphore to wait on for a waitable operation. */ | |
if( IotSemaphore_Create( &( pOperation->u.operation.notify.waitSemaphore ), 0, 1 ) == false ) | |
{ | |
IotLogError( "(MQTT connection %p) Failed to create semaphore for " | |
"waitable operation.", | |
pMqttConnection ); | |
IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_NO_MEMORY ); | |
} | |
else | |
{ | |
/* A waitable operation is created with an additional reference for the | |
* Wait function. */ | |
( pOperation->u.operation.jobReference )++; | |
} | |
} | |
else | |
{ | |
/* If the waitable flag isn't set but a callback is, copy the callback | |
* information. */ | |
if( pCallbackInfo != NULL ) | |
{ | |
pOperation->u.operation.notify.callback = *pCallbackInfo; | |
} | |
else | |
{ | |
EMPTY_ELSE_MARKER; | |
} | |
} | |
/* Add this operation to the MQTT connection's operation list. */ | |
IotMutex_Lock( &( pMqttConnection->referencesMutex ) ); | |
IotListDouble_InsertHead( &( pMqttConnection->pendingProcessing ), | |
&( pOperation->link ) ); | |
IotMutex_Unlock( &( pMqttConnection->referencesMutex ) ); | |
/* Set the output parameter. */ | |
*pNewOperation = pOperation; | |
/* Clean up operation and decrement reference count if this function failed. */ | |
IOT_FUNCTION_CLEANUP_BEGIN(); | |
if( status != IOT_MQTT_SUCCESS ) | |
{ | |
if( decrementOnError == true ) | |
{ | |
_IotMqtt_DecrementConnectionReferences( pMqttConnection ); | |
} | |
else | |
{ | |
EMPTY_ELSE_MARKER; | |
} | |
if( pOperation != NULL ) | |
{ | |
IotMqtt_FreeOperation( pOperation ); | |
} | |
else | |
{ | |
EMPTY_ELSE_MARKER; | |
} | |
} | |
else | |
{ | |
EMPTY_ELSE_MARKER; | |
} | |
IOT_FUNCTION_CLEANUP_END(); | |
} | |
/*-----------------------------------------------------------*/ | |
bool _IotMqtt_DecrementOperationReferences( _mqttOperation_t * pOperation, | |
bool cancelJob ) | |
{ | |
bool destroyOperation = false; | |
IotTaskPoolError_t taskPoolStatus = IOT_TASKPOOL_SUCCESS; | |
_mqttConnection_t * pMqttConnection = pOperation->pMqttConnection; | |
/* Attempt to cancel the operation's job. */ | |
if( cancelJob == true ) | |
{ | |
taskPoolStatus = IotTaskPool_TryCancel( IOT_SYSTEM_TASKPOOL, | |
pOperation->job, | |
NULL ); | |
if( taskPoolStatus == IOT_TASKPOOL_SUCCESS ) | |
{ | |
IotLogDebug( "(MQTT connection %p, %s operation %p) Job canceled.", | |
pMqttConnection, | |
IotMqtt_OperationType( pOperation->u.operation.type ), | |
pOperation ); | |
} | |
else | |
{ | |
EMPTY_ELSE_MARKER; | |
} | |
} | |
else | |
{ | |
EMPTY_ELSE_MARKER; | |
} | |
/* Decrement job reference count. */ | |
if( taskPoolStatus == IOT_TASKPOOL_SUCCESS ) | |
{ | |
IotMutex_Lock( &( pMqttConnection->referencesMutex ) ); | |
pOperation->u.operation.jobReference--; | |
IotLogDebug( "(MQTT connection %p, %s operation %p) Job reference changed" | |
" from %ld to %ld.", | |
pMqttConnection, | |
IotMqtt_OperationType( pOperation->u.operation.type ), | |
pOperation, | |
( long ) ( pOperation->u.operation.jobReference + 1 ), | |
( long ) ( pOperation->u.operation.jobReference ) ); | |
/* The job reference count must be 0 or 1 after the decrement. */ | |
IotMqtt_Assert( ( pOperation->u.operation.jobReference == 0 ) || | |
( pOperation->u.operation.jobReference == 1 ) ); | |
/* This operation may be destroyed if its reference count is 0. */ | |
if( pOperation->u.operation.jobReference == 0 ) | |
{ | |
destroyOperation = true; | |
} | |
else | |
{ | |
EMPTY_ELSE_MARKER; | |
} | |
IotMutex_Unlock( &( pMqttConnection->referencesMutex ) ); | |
} | |
else | |
{ | |
EMPTY_ELSE_MARKER; | |
} | |
return destroyOperation; | |
} | |
/*-----------------------------------------------------------*/ | |
void _IotMqtt_DestroyOperation( _mqttOperation_t * pOperation ) | |
{ | |
_mqttConnection_t * pMqttConnection = pOperation->pMqttConnection; | |
IotLogDebug( "(MQTT connection %p, %s operation %p) Destroying operation.", | |
pMqttConnection, | |
IotMqtt_OperationType( pOperation->u.operation.type ), | |
pOperation ); | |
/* The job reference count must be between 0 and 2. */ | |
IotMqtt_Assert( ( pOperation->u.operation.jobReference >= 0 ) && | |
( pOperation->u.operation.jobReference <= 2 ) ); | |
/* Jobs to be destroyed should be removed from the MQTT connection's | |
* lists. */ | |
IotMutex_Lock( &( pMqttConnection->referencesMutex ) ); | |
if( IotLink_IsLinked( &( pOperation->link ) ) == true ) | |
{ | |
IotLogDebug( "(MQTT connection %p, %s operation %p) Removed operation from connection lists.", | |
pMqttConnection, | |
IotMqtt_OperationType( pOperation->u.operation.type ), | |
pOperation, | |
pMqttConnection ); | |
IotListDouble_Remove( &( pOperation->link ) ); | |
} | |
else | |
{ | |
IotLogDebug( "(MQTT connection %p, %s operation %p) Operation was not present in connection lists.", | |
pMqttConnection, | |
IotMqtt_OperationType( pOperation->u.operation.type ), | |
pOperation ); | |
} | |
IotMutex_Unlock( &( pMqttConnection->referencesMutex ) ); | |
/* Free any allocated MQTT packet. */ | |
if( pOperation->u.operation.pMqttPacket != NULL ) | |
{ | |
_getMqttFreePacketFunc( pMqttConnection->pSerializer )( pOperation->u.operation.pMqttPacket ); | |
IotLogDebug( "(MQTT connection %p, %s operation %p) MQTT packet freed.", | |
pMqttConnection, | |
IotMqtt_OperationType( pOperation->u.operation.type ), | |
pOperation ); | |
} | |
else | |
{ | |
IotLogDebug( "(MQTT connection %p, %s operation %p) Operation has no allocated MQTT packet.", | |
pMqttConnection, | |
IotMqtt_OperationType( pOperation->u.operation.type ), | |
pOperation ); | |
} | |
/* Check if a wait semaphore was created for this operation. */ | |
if( ( pOperation->u.operation.flags & IOT_MQTT_FLAG_WAITABLE ) == IOT_MQTT_FLAG_WAITABLE ) | |
{ | |
IotSemaphore_Destroy( &( pOperation->u.operation.notify.waitSemaphore ) ); | |
IotLogDebug( "(MQTT connection %p, %s operation %p) Wait semaphore destroyed.", | |
pMqttConnection, | |
IotMqtt_OperationType( pOperation->u.operation.type ), | |
pOperation ); | |
} | |
else | |
{ | |
EMPTY_ELSE_MARKER; | |
} | |
IotLogDebug( "(MQTT connection %p, %s operation %p) Operation record destroyed.", | |
pMqttConnection, | |
IotMqtt_OperationType( pOperation->u.operation.type ), | |
pOperation ); | |
/* Free the memory used to hold operation data. */ | |
IotMqtt_FreeOperation( pOperation ); | |
/* Decrement the MQTT connection's reference count after destroying an | |
* operation. */ | |
_IotMqtt_DecrementConnectionReferences( pMqttConnection ); | |
} | |
/*-----------------------------------------------------------*/ | |
void _IotMqtt_ProcessKeepAlive( IotTaskPool_t pTaskPool, | |
IotTaskPoolJob_t pKeepAliveJob, | |
void * pContext ) | |
{ | |
bool status = true; | |
uint32_t swapStatus = 0; | |
IotTaskPoolError_t taskPoolStatus = IOT_TASKPOOL_SUCCESS; | |
size_t bytesSent = 0; | |
/* Swap status is not checked when asserts are disabled. */ | |
( void ) swapStatus; | |
/* Retrieve the MQTT connection from the context. */ | |
_mqttConnection_t * pMqttConnection = ( _mqttConnection_t * ) pContext; | |
_mqttOperation_t * pPingreqOperation = &( pMqttConnection->pingreq ); | |
/* Check parameters. */ | |
IotMqtt_Assert( pKeepAliveJob == pPingreqOperation->job ); | |
/* Check that keep-alive interval is valid. The MQTT spec states its maximum | |
* value is 65,535 seconds. */ | |
IotMqtt_Assert( pPingreqOperation->u.operation.periodic.ping.keepAliveMs <= 65535000 ); | |
/* Only two values are valid for the next keep alive job delay. */ | |
IotMqtt_Assert( ( pPingreqOperation->u.operation.periodic.ping.nextPeriodMs == | |
pPingreqOperation->u.operation.periodic.ping.keepAliveMs ) || | |
( pPingreqOperation->u.operation.periodic.ping.nextPeriodMs | |
== IOT_MQTT_RESPONSE_WAIT_MS ) ); | |
IotLogDebug( "(MQTT connection %p) Keep-alive job started.", pMqttConnection ); | |
/* Determine whether to send a PINGREQ or check for PINGRESP. */ | |
if( pPingreqOperation->u.operation.periodic.ping.nextPeriodMs == | |
pPingreqOperation->u.operation.periodic.ping.keepAliveMs ) | |
{ | |
IotLogDebug( "(MQTT connection %p) Sending PINGREQ.", pMqttConnection ); | |
/* Because PINGREQ may be used to keep the MQTT connection alive, it is | |
* more important than other operations. Bypass the queue of jobs for | |
* operations by directly sending the PINGREQ in this job. */ | |
bytesSent = pMqttConnection->pNetworkInterface->send( pMqttConnection->pNetworkConnection, | |
pPingreqOperation->u.operation.pMqttPacket, | |
pPingreqOperation->u.operation.packetSize ); | |
if( bytesSent != pPingreqOperation->u.operation.packetSize ) | |
{ | |
IotLogError( "(MQTT connection %p) Failed to send PINGREQ.", pMqttConnection ); | |
status = false; | |
} | |
else | |
{ | |
/* Assume the keep-alive will fail. The network receive callback will | |
* clear the failure flag upon receiving a PINGRESP. */ | |
swapStatus = Atomic_CompareAndSwap_u32( &( pPingreqOperation->u.operation.periodic.ping.failure ), | |
1, | |
0 ); | |
IotMqtt_Assert( swapStatus == 1 ); | |
/* Schedule a check for PINGRESP. */ | |
pPingreqOperation->u.operation.periodic.ping.nextPeriodMs = IOT_MQTT_RESPONSE_WAIT_MS; | |
IotLogDebug( "(MQTT connection %p) PINGREQ sent. Scheduling check for PINGRESP in %d ms.", | |
pMqttConnection, | |
IOT_MQTT_RESPONSE_WAIT_MS ); | |
} | |
} | |
else | |
{ | |
IotLogDebug( "(MQTT connection %p) Checking for PINGRESP.", pMqttConnection ); | |
if( Atomic_Add_u32( &( pPingreqOperation->u.operation.periodic.ping.failure ), 0 ) == 0 ) | |
{ | |
IotLogDebug( "(MQTT connection %p) PINGRESP was received.", pMqttConnection ); | |
/* PINGRESP was received. Schedule the next PINGREQ transmission. */ | |
pPingreqOperation->u.operation.periodic.ping.nextPeriodMs = | |
pPingreqOperation->u.operation.periodic.ping.keepAliveMs; | |
} | |
else | |
{ | |
IotLogError( "(MQTT connection %p) Failed to receive PINGRESP within %d ms.", | |
pMqttConnection, | |
IOT_MQTT_RESPONSE_WAIT_MS ); | |
/* The network receive callback did not clear the failure flag. */ | |
status = false; | |
} | |
} | |
/* When a PINGREQ is successfully sent, reschedule this job to check for a | |
* response shortly. */ | |
if( status == true ) | |
{ | |
IotMutex_Lock( &( pMqttConnection->referencesMutex ) ); | |
/* Re-create the keep-alive job for rescheduling. This should never fail. */ | |
taskPoolStatus = IotTaskPool_CreateJob( _IotMqtt_ProcessKeepAlive, | |
pContext, | |
IotTaskPool_GetJobStorageFromHandle( pKeepAliveJob ), | |
&pKeepAliveJob ); | |
IotMqtt_Assert( taskPoolStatus == IOT_TASKPOOL_SUCCESS ); | |
taskPoolStatus = IotTaskPool_ScheduleDeferred( pTaskPool, | |
pKeepAliveJob, | |
pPingreqOperation->u.operation.periodic.ping.nextPeriodMs ); | |
if( taskPoolStatus == IOT_TASKPOOL_SUCCESS ) | |
{ | |
IotLogDebug( "(MQTT connection %p) Next keep-alive job in %lu ms.", | |
pMqttConnection, | |
( unsigned long ) pPingreqOperation->u.operation.periodic.ping.nextPeriodMs ); | |
} | |
else | |
{ | |
IotLogError( "(MQTT connection %p) Failed to reschedule keep-alive job, error %s.", | |
pMqttConnection, | |
IotTaskPool_strerror( taskPoolStatus ) ); | |
status = false; | |
} | |
IotMutex_Unlock( &( pMqttConnection->referencesMutex ) ); | |
} | |
else | |
{ | |
EMPTY_ELSE_MARKER; | |
} | |
/* Close the connection on failures. */ | |
if( status == false ) | |
{ | |
_IotMqtt_CloseNetworkConnection( IOT_MQTT_KEEP_ALIVE_TIMEOUT, | |
pMqttConnection ); | |
/* Keep-alive has failed and will no longer use this MQTT connection. */ | |
_IotMqtt_DecrementConnectionReferences( pMqttConnection ); | |
} | |
else | |
{ | |
EMPTY_ELSE_MARKER; | |
} | |
} | |
/*-----------------------------------------------------------*/ | |
void _IotMqtt_ProcessIncomingPublish( IotTaskPool_t pTaskPool, | |
IotTaskPoolJob_t pPublishJob, | |
void * pContext ) | |
{ | |
_mqttOperation_t * pOperation = pContext; | |
IotMqttCallbackParam_t callbackParam = { .mqttConnection = NULL }; | |
/* Check parameters. The task pool and job parameter is not used when asserts | |
* are disabled. */ | |
( void ) pTaskPool; | |
( void ) pPublishJob; | |
IotMqtt_Assert( pOperation->incomingPublish == true ); | |
IotMqtt_Assert( pPublishJob == pOperation->job ); | |
/* Remove the operation from the pending processing list. */ | |
IotMutex_Lock( &( pOperation->pMqttConnection->referencesMutex ) ); | |
if( IotLink_IsLinked( &( pOperation->link ) ) == true ) | |
{ | |
IotListDouble_Remove( &( pOperation->link ) ); | |
} | |
else | |
{ | |
/* This operation may have already been removed by cleanup of pending | |
* operations (called from Disconnect). In that case, do nothing here. */ | |
EMPTY_ELSE_MARKER; | |
} | |
IotMutex_Unlock( &( pOperation->pMqttConnection->referencesMutex ) ); | |
/* Process the current PUBLISH. */ | |
callbackParam.u.message.info = pOperation->u.publish.publishInfo; | |
_IotMqtt_InvokeSubscriptionCallback( pOperation->pMqttConnection, | |
&callbackParam ); | |
/* Free buffers associated with the current PUBLISH message. */ | |
IotMqtt_Assert( pOperation->u.publish.pReceivedData != NULL ); | |
IotMqtt_FreeMessage( ( void * ) pOperation->u.publish.pReceivedData ); | |
/* Free the incoming PUBLISH operation. */ | |
IotMqtt_FreeOperation( pOperation ); | |
} | |
/*-----------------------------------------------------------*/ | |
void _IotMqtt_ProcessSend( IotTaskPool_t pTaskPool, | |
IotTaskPoolJob_t pSendJob, | |
void * pContext ) | |
{ | |
size_t bytesSent = 0; | |
bool destroyOperation = false, waitable = false, networkPending = false; | |
_mqttOperation_t * pOperation = ( _mqttOperation_t * ) pContext; | |
_mqttConnection_t * pMqttConnection = pOperation->pMqttConnection; | |
/* Check parameters. The task pool and job parameter is not used when asserts | |
* are disabled. */ | |
( void ) pTaskPool; | |
( void ) pSendJob; | |
IotMqtt_Assert( pSendJob == pOperation->job ); | |
/* The given operation must have an allocated packet and be waiting for a status. */ | |
IotMqtt_Assert( pOperation->u.operation.pMqttPacket != NULL ); | |
IotMqtt_Assert( pOperation->u.operation.packetSize != 0 ); | |
IotMqtt_Assert( pOperation->u.operation.status == IOT_MQTT_STATUS_PENDING ); | |
/* Check if this operation is waitable. */ | |
waitable = ( pOperation->u.operation.flags & IOT_MQTT_FLAG_WAITABLE ) == IOT_MQTT_FLAG_WAITABLE; | |
/* Check PUBLISH retry counts and limits. */ | |
if( pOperation->u.operation.periodic.retry.limit > 0 ) | |
{ | |
if( _checkRetryLimit( pOperation ) == false ) | |
{ | |
pOperation->u.operation.status = IOT_MQTT_RETRY_NO_RESPONSE; | |
} | |
else | |
{ | |
EMPTY_ELSE_MARKER; | |
} | |
} | |
else | |
{ | |
EMPTY_ELSE_MARKER; | |
} | |
/* Send an operation that is waiting for a response. */ | |
if( pOperation->u.operation.status == IOT_MQTT_STATUS_PENDING ) | |
{ | |
IotLogDebug( "(MQTT connection %p, %s operation %p) Sending MQTT packet.", | |
pMqttConnection, | |
IotMqtt_OperationType( pOperation->u.operation.type ), | |
pOperation ); | |
/* Transmit the MQTT packet from the operation over the network. */ | |
bytesSent = pMqttConnection->pNetworkInterface->send( pMqttConnection->pNetworkConnection, | |
pOperation->u.operation.pMqttPacket, | |
pOperation->u.operation.packetSize ); | |
/* Check transmission status. */ | |
if( bytesSent != pOperation->u.operation.packetSize ) | |
{ | |
pOperation->u.operation.status = IOT_MQTT_NETWORK_ERROR; | |
} | |
else | |
{ | |
/* DISCONNECT operations are considered successful upon successful | |
* transmission. In addition, non-waitable operations with no callback | |
* may also be considered successful. */ | |
if( pOperation->u.operation.type == IOT_MQTT_DISCONNECT ) | |
{ | |
/* DISCONNECT operations are always waitable. */ | |
IotMqtt_Assert( waitable == true ); | |
pOperation->u.operation.status = IOT_MQTT_SUCCESS; | |
} | |
else if( waitable == false ) | |
{ | |
if( pOperation->u.operation.notify.callback.function == NULL ) | |
{ | |
pOperation->u.operation.status = IOT_MQTT_SUCCESS; | |
} | |
else | |
{ | |
EMPTY_ELSE_MARKER; | |
} | |
} | |
else | |
{ | |
EMPTY_ELSE_MARKER; | |
} | |
} | |
} | |
else | |
{ | |
EMPTY_ELSE_MARKER; | |
} | |
/* Check if this operation requires further processing. */ | |
if( pOperation->u.operation.status == IOT_MQTT_STATUS_PENDING ) | |
{ | |
/* Check if this operation should be scheduled for retransmission. */ | |
if( pOperation->u.operation.periodic.retry.limit > 0 ) | |
{ | |
if( _scheduleNextRetry( pOperation ) == false ) | |
{ | |
pOperation->u.operation.status = IOT_MQTT_SCHEDULING_ERROR; | |
} | |
else | |
{ | |
/* A successfully scheduled PUBLISH retry is awaiting a response | |
* from the network. */ | |
networkPending = true; | |
} | |
} | |
else | |
{ | |
/* Decrement reference count to signal completion of send job. Check | |
* if the operation should be destroyed. */ | |
IotMutex_Lock( &( pMqttConnection->referencesMutex ) ); | |
if( waitable == true ) | |
{ | |
destroyOperation = _IotMqtt_DecrementOperationReferences( pOperation, false ); | |
} | |
else | |
{ | |
EMPTY_ELSE_MARKER; | |
} | |
/* If the operation should not be destroyed, transfer it from the | |
* pending processing to the pending response list. */ | |
if( destroyOperation == false ) | |
{ | |
if( IotLink_IsLinked( &( pOperation->link ) ) == true ) | |
{ | |
IotListDouble_Remove( &( pOperation->link ) ); | |
} | |
IotListDouble_InsertHead( &( pMqttConnection->pendingResponse ), | |
&( pOperation->link ) ); | |
/* This operation is now awaiting a response from the network. */ | |
networkPending = true; | |
} | |
else | |
{ | |
EMPTY_ELSE_MARKER; | |
} | |
IotMutex_Unlock( &( pMqttConnection->referencesMutex ) ); | |
} | |
} | |
else | |
{ | |
EMPTY_ELSE_MARKER; | |
} | |
/* Destroy the operation or notify of completion if necessary. */ | |
if( destroyOperation == true ) | |
{ | |
_IotMqtt_DestroyOperation( pOperation ); | |
} | |
else | |
{ | |
/* Do not check the operation status if a network response is pending, | |
* since a network response could modify the status. */ | |
if( networkPending == false ) | |
{ | |
/* Notify of operation completion if this job set a status. */ | |
if( pOperation->u.operation.status != IOT_MQTT_STATUS_PENDING ) | |
{ | |
_IotMqtt_Notify( pOperation ); | |
} | |
else | |
{ | |
EMPTY_ELSE_MARKER; | |
} | |
} | |
else | |
{ | |
EMPTY_ELSE_MARKER; | |
} | |
} | |
} | |
/*-----------------------------------------------------------*/ | |
void _IotMqtt_ProcessCompletedOperation( IotTaskPool_t pTaskPool, | |
IotTaskPoolJob_t pOperationJob, | |
void * pContext ) | |
{ | |
_mqttOperation_t * pOperation = ( _mqttOperation_t * ) pContext; | |
IotMqttCallbackParam_t callbackParam = { 0 }; | |
/* Check parameters. The task pool and job parameter is not used when asserts | |
* are disabled. */ | |
( void ) pTaskPool; | |
( void ) pOperationJob; | |
IotMqtt_Assert( pOperationJob == pOperation->job ); | |
/* The operation's callback function and status must be set. */ | |
IotMqtt_Assert( pOperation->u.operation.notify.callback.function != NULL ); | |
IotMqtt_Assert( pOperation->u.operation.status != IOT_MQTT_STATUS_PENDING ); | |
callbackParam.mqttConnection = pOperation->pMqttConnection; | |
callbackParam.u.operation.type = pOperation->u.operation.type; | |
callbackParam.u.operation.reference = pOperation; | |
callbackParam.u.operation.result = pOperation->u.operation.status; | |
/* Invoke the user callback function. */ | |
pOperation->u.operation.notify.callback.function( pOperation->u.operation.notify.callback.pCallbackContext, | |
&callbackParam ); | |
/* Attempt to destroy the operation once the user callback returns. */ | |
if( _IotMqtt_DecrementOperationReferences( pOperation, false ) == true ) | |
{ | |
_IotMqtt_DestroyOperation( pOperation ); | |
} | |
else | |
{ | |
EMPTY_ELSE_MARKER; | |
} | |
} | |
/*-----------------------------------------------------------*/ | |
IotMqttError_t _IotMqtt_ScheduleOperation( _mqttOperation_t * pOperation, | |
IotTaskPoolRoutine_t jobRoutine, | |
uint32_t delay ) | |
{ | |
IotMqttError_t status = IOT_MQTT_SUCCESS; | |
IotTaskPoolError_t taskPoolStatus = IOT_TASKPOOL_SUCCESS; | |
/* Check that job routine is valid. */ | |
IotMqtt_Assert( ( jobRoutine == _IotMqtt_ProcessSend ) || | |
( jobRoutine == _IotMqtt_ProcessCompletedOperation ) || | |
( jobRoutine == _IotMqtt_ProcessIncomingPublish ) ); | |
/* Creating a new job should never fail when parameters are valid. */ | |
taskPoolStatus = IotTaskPool_CreateJob( jobRoutine, | |
pOperation, | |
&( pOperation->jobStorage ), | |
&( pOperation->job ) ); | |
IotMqtt_Assert( taskPoolStatus == IOT_TASKPOOL_SUCCESS ); | |
/* Schedule the new job with a delay. */ | |
taskPoolStatus = IotTaskPool_ScheduleDeferred( IOT_SYSTEM_TASKPOOL, | |
pOperation->job, | |
delay ); | |
if( taskPoolStatus != IOT_TASKPOOL_SUCCESS ) | |
{ | |
/* Scheduling a newly-created job should never be invalid or illegal. */ | |
IotMqtt_Assert( taskPoolStatus != IOT_TASKPOOL_BAD_PARAMETER ); | |
IotMqtt_Assert( taskPoolStatus != IOT_TASKPOOL_ILLEGAL_OPERATION ); | |
IotLogWarn( "(MQTT connection %p, %s operation %p) Failed to schedule operation job, error %s.", | |
pOperation->pMqttConnection, | |
IotMqtt_OperationType( pOperation->u.operation.type ), | |
pOperation, | |
IotTaskPool_strerror( taskPoolStatus ) ); | |
status = IOT_MQTT_SCHEDULING_ERROR; | |
} | |
else | |
{ | |
EMPTY_ELSE_MARKER; | |
} | |
return status; | |
} | |
/*-----------------------------------------------------------*/ | |
_mqttOperation_t * _IotMqtt_FindOperation( _mqttConnection_t * pMqttConnection, | |
IotMqttOperationType_t type, | |
const uint16_t * pPacketIdentifier ) | |
{ | |
bool waitable = false; | |
IotTaskPoolError_t taskPoolStatus = IOT_TASKPOOL_SUCCESS; | |
_mqttOperation_t * pResult = NULL; | |
IotLink_t * pResultLink = NULL; | |
_operationMatchParam_t operationMatchParams = { 0 }; | |
/* Set the members of the search parameter. */ | |
operationMatchParams.type = type; | |
operationMatchParams.pPacketIdentifier = pPacketIdentifier; | |
if( pPacketIdentifier != NULL ) | |
{ | |
IotLogDebug( "(MQTT connection %p) Searching for operation %s pending response " | |
"with packet identifier %hu.", | |
pMqttConnection, | |
IotMqtt_OperationType( type ), | |
*pPacketIdentifier ); | |
} | |
else | |
{ | |
IotLogDebug( "(MQTT connection %p) Searching for operation %s pending response.", | |
pMqttConnection, | |
IotMqtt_OperationType( type ) ); | |
} | |
/* Find and remove the first matching element in the list. */ | |
IotMutex_Lock( &( pMqttConnection->referencesMutex ) ); | |
pResultLink = IotListDouble_FindFirstMatch( &( pMqttConnection->pendingResponse ), | |
NULL, | |
_mqttOperation_match, | |
&operationMatchParams ); | |
/* Check if a match was found. */ | |
if( pResultLink != NULL ) | |
{ | |
/* Get operation pointer and check if it is waitable. */ | |
pResult = IotLink_Container( _mqttOperation_t, pResultLink, link ); | |
waitable = ( pResult->u.operation.flags & IOT_MQTT_FLAG_WAITABLE ) == IOT_MQTT_FLAG_WAITABLE; | |
/* Check if the matched operation is a PUBLISH with retry. If it is, cancel | |
* the retry job. */ | |
if( pResult->u.operation.periodic.retry.limit > 0 ) | |
{ | |
taskPoolStatus = IotTaskPool_TryCancel( IOT_SYSTEM_TASKPOOL, | |
pResult->job, | |
NULL ); | |
/* If the retry job could not be canceled, then it is currently | |
* executing. Ignore the operation. */ | |
if( taskPoolStatus != IOT_TASKPOOL_SUCCESS ) | |
{ | |
pResult = NULL; | |
} | |
else | |
{ | |
/* Check job reference counts. A waitable operation should have a | |
* count of 2; a non-waitable operation should have a count of 1. */ | |
IotMqtt_Assert( pResult->u.operation.jobReference == ( 1 + ( waitable == true ) ) ); | |
} | |
} | |
else | |
{ | |
/* An operation with no retry in the pending responses list should | |
* always have a job reference of 1. */ | |
IotMqtt_Assert( pResult->u.operation.jobReference == 1 ); | |
/* Increment job references of a waitable operation to prevent Wait from | |
* destroying this operation if it times out. */ | |
if( waitable == true ) | |
{ | |
( pResult->u.operation.jobReference )++; | |
IotLogDebug( "(MQTT connection %p, %s operation %p) Job reference changed from %ld to %ld.", | |
pMqttConnection, | |
IotMqtt_OperationType( type ), | |
pResult, | |
( long int ) ( pResult->u.operation.jobReference - 1 ), | |
( long int ) ( pResult->u.operation.jobReference ) ); | |
} | |
} | |
} | |
else | |
{ | |
EMPTY_ELSE_MARKER; | |
} | |
if( pResult != NULL ) | |
{ | |
IotLogDebug( "(MQTT connection %p) Found operation %s.", | |
pMqttConnection, | |
IotMqtt_OperationType( type ) ); | |
/* Remove the matched operation from the list. */ | |
IotListDouble_Remove( &( pResult->link ) ); | |
} | |
else | |
{ | |
IotLogDebug( "(MQTT connection %p) Operation %s not found.", | |
pMqttConnection, | |
IotMqtt_OperationType( type ) ); | |
} | |
IotMutex_Unlock( &( pMqttConnection->referencesMutex ) ); | |
return pResult; | |
} | |
/*-----------------------------------------------------------*/ | |
void _IotMqtt_Notify( _mqttOperation_t * pOperation ) | |
{ | |
IotMqttError_t status = IOT_MQTT_SCHEDULING_ERROR; | |
_mqttConnection_t * pMqttConnection = pOperation->pMqttConnection; | |
/* Check if operation is waitable. */ | |
bool waitable = ( pOperation->u.operation.flags & IOT_MQTT_FLAG_WAITABLE ) == IOT_MQTT_FLAG_WAITABLE; | |
/* Remove any lingering subscriptions if a SUBSCRIBE failed. Rejected | |
* subscriptions are removed by the deserializer, so not removed here. */ | |
if( pOperation->u.operation.type == IOT_MQTT_SUBSCRIBE ) | |
{ | |
switch( pOperation->u.operation.status ) | |
{ | |
case IOT_MQTT_SUCCESS: | |
break; | |
case IOT_MQTT_SERVER_REFUSED: | |
break; | |
default: | |
_IotMqtt_RemoveSubscriptionByPacket( pOperation->pMqttConnection, | |
pOperation->u.operation.packetIdentifier, | |
-1 ); | |
break; | |
} | |
} | |
else | |
{ | |
EMPTY_ELSE_MARKER; | |
} | |
/* Schedule callback invocation for non-waitable operation. */ | |
if( waitable == false ) | |
{ | |
/* Non-waitable operation should have job reference of 1. */ | |
IotMqtt_Assert( pOperation->u.operation.jobReference == 1 ); | |
/* Schedule an invocation of the callback. */ | |
if( pOperation->u.operation.notify.callback.function != NULL ) | |
{ | |
IotMutex_Lock( &( pMqttConnection->referencesMutex ) ); | |
status = _IotMqtt_ScheduleOperation( pOperation, | |
_IotMqtt_ProcessCompletedOperation, | |
0 ); | |
if( status == IOT_MQTT_SUCCESS ) | |
{ | |
IotLogDebug( "(MQTT connection %p, %s operation %p) Callback scheduled.", | |
pOperation->pMqttConnection, | |
IotMqtt_OperationType( pOperation->u.operation.type ), | |
pOperation ); | |
/* Place the scheduled operation back in the list of operations pending | |
* processing. */ | |
if( IotLink_IsLinked( &( pOperation->link ) ) == true ) | |
{ | |
IotListDouble_Remove( &( pOperation->link ) ); | |
} | |
else | |
{ | |
EMPTY_ELSE_MARKER; | |
} | |
IotListDouble_InsertHead( &( pMqttConnection->pendingProcessing ), | |
&( pOperation->link ) ); | |
} | |
else | |
{ | |
IotLogWarn( "(MQTT connection %p, %s operation %p) Failed to schedule callback.", | |
pOperation->pMqttConnection, | |
IotMqtt_OperationType( pOperation->u.operation.type ), | |
pOperation ); | |
} | |
IotMutex_Unlock( &( pMqttConnection->referencesMutex ) ); | |
} | |
else | |
{ | |
EMPTY_ELSE_MARKER; | |
} | |
} | |
else | |
{ | |
EMPTY_ELSE_MARKER; | |
} | |
/* Operations that weren't scheduled may be destroyed. */ | |
if( status == IOT_MQTT_SCHEDULING_ERROR ) | |
{ | |
/* Decrement reference count of operations not scheduled. */ | |
if( _IotMqtt_DecrementOperationReferences( pOperation, false ) == true ) | |
{ | |
_IotMqtt_DestroyOperation( pOperation ); | |
} | |
else | |
{ | |
/* Post to a waitable operation's semaphore. */ | |
if( waitable == true ) | |
{ | |
IotLogDebug( "(MQTT connection %p, %s operation %p) Waitable operation " | |
"notified of completion.", | |
pOperation->pMqttConnection, | |
IotMqtt_OperationType( pOperation->u.operation.type ), | |
pOperation ); | |
IotSemaphore_Post( &( pOperation->u.operation.notify.waitSemaphore ) ); | |
} | |
else | |
{ | |
EMPTY_ELSE_MARKER; | |
} | |
} | |
} | |
else | |
{ | |
IotMqtt_Assert( status == IOT_MQTT_SUCCESS ); | |
} | |
} | |
/*-----------------------------------------------------------*/ |