/* | |
* IoT MQTT V2.1.0 | |
* Copyright (C) 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. | |
* | |
* Permission is hereby granted, free of charge, to any person obtaining a copy of | |
* this software and associated documentation files (the "Software"), to deal in | |
* the Software without restriction, including without limitation the rights to | |
* use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of | |
* the Software, and to permit persons to whom the Software is furnished to do so, | |
* subject to the following conditions: | |
* | |
* The above copyright notice and this permission notice shall be included in all | |
* copies or substantial portions of the Software. | |
* | |
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR | |
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS | |
* FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR | |
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER | |
* IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN | |
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. | |
*/ | |
/** | |
* @file iot_mqtt_api.c | |
* @brief Implements most user-facing functions of the MQTT library. | |
*/ | |
/* The config header is always included first. */ | |
#include "iot_config.h" | |
/* Standard includes. */ | |
#include <string.h> | |
/* Error handling include. */ | |
#include "iot_error.h" | |
/* MQTT internal include. */ | |
#include "private/iot_mqtt_internal.h" | |
/* Platform layer includes. */ | |
#include "platform/iot_clock.h" | |
#include "platform/iot_threads.h" | |
/* Atomics include. */ | |
#include "iot_atomic.h" | |
/* Validate MQTT configuration settings. */ | |
#if IOT_MQTT_ENABLE_ASSERTS != 0 && IOT_MQTT_ENABLE_ASSERTS != 1 | |
#error "IOT_MQTT_ENABLE_ASSERTS must be 0 or 1." | |
#endif | |
#if IOT_MQTT_ENABLE_METRICS != 0 && IOT_MQTT_ENABLE_METRICS != 1 | |
#error "IOT_MQTT_ENABLE_METRICS must be 0 or 1." | |
#endif | |
#if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES != 0 && IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES != 1 | |
#error "IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES must be 0 or 1." | |
#endif | |
#if IOT_MQTT_RESPONSE_WAIT_MS <= 0 | |
#error "IOT_MQTT_RESPONSE_WAIT_MS cannot be 0 or negative." | |
#endif | |
#if IOT_MQTT_RETRY_MS_CEILING <= 0 | |
#error "IOT_MQTT_RETRY_MS_CEILING cannot be 0 or negative." | |
#endif | |
/*-----------------------------------------------------------*/ | |
/** | |
* @brief Uninitialized value for @ref _initCalled. | |
*/ | |
#define MQTT_LIBRARY_UNINITIALIZED ( ( uint32_t ) 0 ) | |
/** | |
* @brief Initialized value for @ref _initCalled. | |
*/ | |
#define MQTT_LIBRARY_INITIALIZED ( ( uint32_t ) 1 ) | |
/*-----------------------------------------------------------*/ | |
/** | |
* @brief Check if the library is initialized. | |
* | |
* @return `true` if IotMqtt_Init was called; `false` otherwise. | |
*/ | |
static bool _checkInit( void ); | |
/** | |
* @brief Set the unsubscribed flag of an MQTT subscription. | |
* | |
* @param[in] pSubscriptionLink Pointer to the link member of an #_mqttSubscription_t. | |
* @param[in] pMatch Not used. | |
* | |
* @return Always returns `true`. | |
*/ | |
static bool _mqttSubscription_setUnsubscribe( const IotLink_t * pSubscriptionLink, | |
void * pMatch ); | |
/** | |
* @brief Destroy an MQTT subscription if its reference count is 0. | |
* | |
* @param[in] pData The subscription to destroy. This parameter is of type | |
* `void*` for compatibility with [free] | |
* (http://pubs.opengroup.org/onlinepubs/9699919799/functions/free.html). | |
*/ | |
static void _mqttSubscription_tryDestroy( void * pData ); | |
/** | |
* @brief Decrement the reference count of an MQTT operation and attempt to | |
* destroy it. | |
* | |
* @param[in] pData The operation data to destroy. This parameter is of type | |
* `void*` for compatibility with [free] | |
* (http://pubs.opengroup.org/onlinepubs/9699919799/functions/free.html). | |
*/ | |
static void _mqttOperation_tryDestroy( void * pData ); | |
/** | |
* @brief Initialize the keep-alive operation for an MQTT connection. | |
* | |
* @param[in] pNetworkInfo User-provided network information for the new | |
* connection. | |
* @param[in] keepAliveSeconds User-provided keep-alive interval. | |
* @param[out] pMqttConnection The MQTT connection associated with the keep-alive. | |
* | |
* @return `true` if the keep-alive job was successfully created; `false` otherwise. | |
*/ | |
static bool _createKeepAliveOperation( const IotMqttNetworkInfo_t * pNetworkInfo, | |
uint16_t keepAliveSeconds, | |
_mqttConnection_t * pMqttConnection ); | |
/** | |
* @brief Initialize a network connection, creating it if necessary. | |
* | |
* @param[in] pNetworkInfo User-provided network information for the connection | |
* connection. | |
* @param[out] pNetworkConnection On success, the created and/or initialized network connection. | |
* @param[out] pCreatedNewNetworkConnection On success, `true` if a new network connection was created; `false` if an existing one will be used. | |
* | |
* @return Any #IotNetworkError_t, as defined by the network stack. | |
*/ | |
static IotNetworkError_t _createNetworkConnection( const IotMqttNetworkInfo_t * pNetworkInfo, | |
IotNetworkConnection_t * pNetworkConnection, | |
bool * pCreatedNewNetworkConnection ); | |
/** | |
* @brief Creates a new MQTT connection and initializes its members. | |
* | |
* @param[in] awsIotMqttMode Specifies if this connection is to an AWS IoT MQTT server. | |
* @param[in] pNetworkInfo User-provided network information for the new | |
* connection. | |
* @param[in] keepAliveSeconds User-provided keep-alive interval for the new connection. | |
* | |
* @return Pointer to a newly-created MQTT connection; `NULL` on failure. | |
*/ | |
static _mqttConnection_t * _createMqttConnection( bool awsIotMqttMode, | |
const IotMqttNetworkInfo_t * pNetworkInfo, | |
uint16_t keepAliveSeconds ); | |
/** | |
* @brief Destroys the members of an MQTT connection. | |
* | |
* @param[in] pMqttConnection Which connection to destroy. | |
*/ | |
static void _destroyMqttConnection( _mqttConnection_t * pMqttConnection ); | |
/** | |
* @brief Common setup function for subscribe and unsubscribe operations. | |
* | |
* See @ref mqtt_function_subscribeasync or @ref mqtt_function_unsubscribeasync for a | |
* description of the parameters and return values. | |
*/ | |
static IotMqttError_t _subscriptionCommonSetup( IotMqttOperationType_t operation, | |
IotMqttConnection_t mqttConnection, | |
const IotMqttSubscription_t * pSubscriptionList, | |
size_t subscriptionCount, | |
uint32_t flags, | |
IotMqttOperation_t * const pOperationReference ); | |
/** | |
* @brief Utility function for creating and serializing subscription requests | |
* | |
* See @ref mqtt_function_subscribeasync or @ref mqtt_function_unsubscribeasync for a | |
* description of the parameters and return values. | |
*/ | |
static IotMqttError_t _subscriptionCreateAndSerialize( IotMqttOperationType_t operation, | |
IotMqttConnection_t mqttConnection, | |
IotMqttSerializeSubscribe_t serializeSubscription, | |
const IotMqttSubscription_t * pSubscriptionList, | |
size_t subscriptionCount, | |
uint32_t flags, | |
const IotMqttCallbackInfo_t * pCallbackInfo, | |
_mqttOperation_t ** ppSubscriptionOperation ); | |
/** | |
* @brief The common component of both @ref mqtt_function_subscribeasync and @ref | |
* mqtt_function_unsubscribeasync. | |
* | |
* See @ref mqtt_function_subscribeasync or @ref mqtt_function_unsubscribeasync for a | |
* description of the parameters and return values. | |
*/ | |
static IotMqttError_t _subscriptionCommon( IotMqttOperationType_t operation, | |
IotMqttConnection_t mqttConnection, | |
IotMqttSerializeSubscribe_t serializeSubscription, | |
const IotMqttSubscription_t * pSubscriptionList, | |
size_t subscriptionCount, | |
uint32_t flags, | |
const IotMqttCallbackInfo_t * pCallbackInfo, | |
IotMqttOperation_t * const pOperationReference ); | |
/** | |
* @cond DOXYGEN_IGNORE | |
* Doxygen should ignore this section. | |
* | |
* Declaration of local MQTT serializer override selectors | |
*/ | |
#if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1 | |
_SERIALIZER_OVERRIDE_SELECTOR( IotMqttSerializePingreq_t, | |
_getMqttPingreqSerializer, | |
_IotMqtt_SerializePingreq, | |
serialize.pingreq ) | |
_SERIALIZER_OVERRIDE_SELECTOR( IotMqtt_SerializePublish_t, | |
_getMqttPublishSerializer, | |
_IotMqtt_SerializePublish, | |
serialize.publish ) | |
_SERIALIZER_OVERRIDE_SELECTOR( IotMqttFreePacket_t, | |
_getMqttFreePacketFunc, | |
_IotMqtt_FreePacket, | |
freePacket ) | |
_SERIALIZER_OVERRIDE_SELECTOR( IotMqttSerializeSubscribe_t, | |
_getMqttSubscribeSerializer, | |
_IotMqtt_SerializeSubscribe, | |
serialize.subscribe ) | |
_SERIALIZER_OVERRIDE_SELECTOR( IotMqttSerializeSubscribe_t, | |
_getMqttUnsubscribeSerializer, | |
_IotMqtt_SerializeUnsubscribe, | |
serialize.unsubscribe ) | |
_SERIALIZER_OVERRIDE_SELECTOR( IotMqttSerializeConnect_t, | |
_getMqttConnectSerializer, | |
_IotMqtt_SerializeConnect, | |
serialize.connect ) | |
_SERIALIZER_OVERRIDE_SELECTOR( IotMqttSerializeDisconnect_t, | |
_getMqttDisconnectSerializer, | |
_IotMqtt_SerializeDisconnect, | |
serialize.disconnect ) | |
#else /* if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1 */ | |
#define _getMqttPingreqSerializer( pSerializer ) _IotMqtt_SerializePingreq | |
#define _getMqttPublishSerializer( pSerializer ) _IotMqtt_SerializePublish | |
#define _getMqttFreePacketFunc( pSerializer ) _IotMqtt_FreePacket | |
#define _getMqttSubscribeSerializer( pSerializer ) _IotMqtt_SerializeSubscribe | |
#define _getMqttUnsubscribeSerializer( pSerializer ) _IotMqtt_SerializeUnsubscribe | |
#define _getMqttConnectSerializer( pSerializer ) _IotMqtt_SerializeConnect | |
#define _getMqttDisconnectSerializer( pSerializer ) _IotMqtt_SerializeDisconnect | |
#endif /* if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1 */ | |
/** @endcond */ | |
/*-----------------------------------------------------------*/ | |
/** | |
* @brief Tracks whether @ref mqtt_function_init has been called. | |
* | |
* API functions will fail if @ref mqtt_function_init was not called. | |
*/ | |
static volatile uint32_t _initCalled = MQTT_LIBRARY_UNINITIALIZED; | |
/*-----------------------------------------------------------*/ | |
static bool _checkInit( void ) | |
{ | |
bool status = true; | |
if( _initCalled == MQTT_LIBRARY_UNINITIALIZED ) | |
{ | |
IotLogError( "IotMqtt_Init was not called." ); | |
status = false; | |
} | |
else | |
{ | |
EMPTY_ELSE_MARKER; | |
} | |
return status; | |
} | |
/*-----------------------------------------------------------*/ | |
static bool _mqttSubscription_setUnsubscribe( const IotLink_t * pSubscriptionLink, | |
void * pMatch ) | |
{ | |
/* Because this function is called from a container function, the given link | |
* must never be NULL. */ | |
IotMqtt_Assert( pSubscriptionLink != NULL ); | |
_mqttSubscription_t * pSubscription = IotLink_Container( _mqttSubscription_t, | |
pSubscriptionLink, | |
link ); | |
/* Silence warnings about unused parameters. */ | |
( void ) pMatch; | |
/* Set the unsubscribed flag. */ | |
pSubscription->unsubscribed = true; | |
return true; | |
} | |
/*-----------------------------------------------------------*/ | |
static void _mqttSubscription_tryDestroy( void * pData ) | |
{ | |
_mqttSubscription_t * pSubscription = ( _mqttSubscription_t * ) pData; | |
/* Reference count must not be negative. */ | |
IotMqtt_Assert( pSubscription->references >= 0 ); | |
/* Unsubscribed flag should be set. */ | |
IotMqtt_Assert( pSubscription->unsubscribed == true ); | |
/* Free the subscription if it has no references. */ | |
if( pSubscription->references == 0 ) | |
{ | |
IotMqtt_FreeSubscription( pSubscription ); | |
} | |
else | |
{ | |
EMPTY_ELSE_MARKER; | |
} | |
} | |
/*-----------------------------------------------------------*/ | |
static void _mqttOperation_tryDestroy( void * pData ) | |
{ | |
_mqttOperation_t * pOperation = ( _mqttOperation_t * ) pData; | |
IotTaskPoolError_t taskPoolStatus = IOT_TASKPOOL_SUCCESS; | |
/* Incoming PUBLISH operations may always be freed. */ | |
if( pOperation->incomingPublish == true ) | |
{ | |
/* Cancel the incoming PUBLISH operation's job. */ | |
taskPoolStatus = IotTaskPool_TryCancel( IOT_SYSTEM_TASKPOOL, | |
pOperation->job, | |
NULL ); | |
/* If the operation's job was not canceled, it must be already executing. | |
* Any other return value is invalid. */ | |
IotMqtt_Assert( ( taskPoolStatus == IOT_TASKPOOL_SUCCESS ) || | |
( taskPoolStatus == IOT_TASKPOOL_CANCEL_FAILED ) ); | |
/* Check if the incoming PUBLISH job was canceled. */ | |
if( taskPoolStatus == IOT_TASKPOOL_SUCCESS ) | |
{ | |
/* Job was canceled. Process incoming PUBLISH now to clean up. */ | |
_IotMqtt_ProcessIncomingPublish( IOT_SYSTEM_TASKPOOL, | |
pOperation->job, | |
pOperation ); | |
} | |
else | |
{ | |
/* The executing job will process the PUBLISH, so nothing is done here. */ | |
EMPTY_ELSE_MARKER; | |
} | |
} | |
else | |
{ | |
/* Decrement reference count and destroy operation if possible. */ | |
if( _IotMqtt_DecrementOperationReferences( pOperation, true ) == true ) | |
{ | |
_IotMqtt_DestroyOperation( pOperation ); | |
} | |
else | |
{ | |
EMPTY_ELSE_MARKER; | |
} | |
} | |
} | |
/*-----------------------------------------------------------*/ | |
static bool _createKeepAliveOperation( const IotMqttNetworkInfo_t * pNetworkInfo, | |
uint16_t keepAliveSeconds, | |
_mqttConnection_t * pMqttConnection ) | |
{ | |
bool status = true; | |
IotMqttError_t serializeStatus = IOT_MQTT_SUCCESS; | |
IotTaskPoolError_t jobStatus = IOT_TASKPOOL_SUCCESS; | |
/* Network information is not used when MQTT packet serializers are disabled. */ | |
( void ) pNetworkInfo; | |
/* Set PINGREQ operation members. */ | |
pMqttConnection->pingreq.u.operation.type = IOT_MQTT_PINGREQ; | |
/* Convert the keep-alive interval to milliseconds. */ | |
pMqttConnection->pingreq.u.operation.periodic.ping.keepAliveMs = keepAliveSeconds * 1000; | |
pMqttConnection->pingreq.u.operation.periodic.ping.nextPeriodMs = keepAliveSeconds * 1000; | |
/* Generate a PINGREQ packet. */ | |
serializeStatus = _getMqttPingreqSerializer( pMqttConnection->pSerializer )( &( pMqttConnection->pingreq.u.operation.pMqttPacket ), | |
&( pMqttConnection->pingreq.u.operation.packetSize ) ); | |
if( serializeStatus != IOT_MQTT_SUCCESS ) | |
{ | |
IotLogError( "Failed to allocate PINGREQ packet for new connection." ); | |
status = false; | |
} | |
else | |
{ | |
/* Create the task pool job that processes keep-alive. */ | |
jobStatus = IotTaskPool_CreateJob( _IotMqtt_ProcessKeepAlive, | |
pMqttConnection, | |
&( pMqttConnection->pingreq.jobStorage ), | |
&( pMqttConnection->pingreq.job ) ); | |
/* Task pool job creation for a pre-allocated job should never fail. | |
* Abort the program if it does. */ | |
if( jobStatus != IOT_TASKPOOL_SUCCESS ) | |
{ | |
IotLogError( "Failed to create keep-alive job for new connection." ); | |
IotMqtt_Assert( false ); | |
} | |
else | |
{ | |
EMPTY_ELSE_MARKER; | |
} | |
/* Keep-alive references its MQTT connection, so increment reference. */ | |
( pMqttConnection->references )++; | |
} | |
return status; | |
} | |
/*-----------------------------------------------------------*/ | |
static IotNetworkError_t _createNetworkConnection( const IotMqttNetworkInfo_t * pNetworkInfo, | |
IotNetworkConnection_t * pNetworkConnection, | |
bool * pCreatedNewNetworkConnection ) | |
{ | |
IOT_FUNCTION_ENTRY( IotNetworkError_t, IOT_NETWORK_SUCCESS ); | |
/* Network info must not be NULL. */ | |
if( pNetworkInfo == NULL ) | |
{ | |
IotLogError( "Network information cannot be NULL." ); | |
IOT_SET_AND_GOTO_CLEANUP( IOT_NETWORK_BAD_PARAMETER ); | |
} | |
else | |
{ | |
EMPTY_ELSE_MARKER; | |
} | |
/* Create a new network connection if requested. Otherwise, copy the existing | |
* network connection. */ | |
if( pNetworkInfo->createNetworkConnection == true ) | |
{ | |
status = pNetworkInfo->pNetworkInterface->create( pNetworkInfo->u.setup.pNetworkServerInfo, | |
pNetworkInfo->u.setup.pNetworkCredentialInfo, | |
pNetworkConnection ); | |
if( status == IOT_NETWORK_SUCCESS ) | |
{ | |
/* This MQTT connection owns the network connection it created and | |
* should destroy it on cleanup. */ | |
*pCreatedNewNetworkConnection = true; | |
} | |
else | |
{ | |
IotLogError( "Failed to create network connection: %d", status ); | |
IOT_GOTO_CLEANUP(); | |
} | |
} | |
else | |
{ | |
/* A connection already exists; the caller should not destroy | |
* it on cleanup. */ | |
*pNetworkConnection = pNetworkInfo->u.pNetworkConnection; | |
*pCreatedNewNetworkConnection = false; | |
} | |
IOT_FUNCTION_EXIT_NO_CLEANUP(); | |
} | |
/*-----------------------------------------------------------*/ | |
static _mqttConnection_t * _createMqttConnection( bool awsIotMqttMode, | |
const IotMqttNetworkInfo_t * pNetworkInfo, | |
uint16_t keepAliveSeconds ) | |
{ | |
IOT_FUNCTION_ENTRY( bool, true ); | |
_mqttConnection_t * pMqttConnection = NULL; | |
bool referencesMutexCreated = false, subscriptionMutexCreated = false; | |
/* Allocate memory for the new MQTT connection. */ | |
pMqttConnection = IotMqtt_MallocConnection( sizeof( _mqttConnection_t ) ); | |
if( pMqttConnection == NULL ) | |
{ | |
IotLogError( "Failed to allocate memory for new connection." ); | |
IOT_SET_AND_GOTO_CLEANUP( false ); | |
} | |
else | |
{ | |
/* Clear the MQTT connection, then copy the MQTT server mode, network | |
* interface, and disconnect callback. */ | |
( void ) memset( pMqttConnection, 0x00, sizeof( _mqttConnection_t ) ); | |
pMqttConnection->awsIotMqttMode = awsIotMqttMode; | |
pMqttConnection->pNetworkInterface = pNetworkInfo->pNetworkInterface; | |
pMqttConnection->disconnectCallback = pNetworkInfo->disconnectCallback; | |
/* Start a new MQTT connection with a reference count of 1. */ | |
pMqttConnection->references = 1; | |
} | |
/* Create the references mutex for a new connection. It is a recursive mutex. */ | |
referencesMutexCreated = IotMutex_Create( &( pMqttConnection->referencesMutex ), true ); | |
if( referencesMutexCreated == false ) | |
{ | |
IotLogError( "Failed to create references mutex for new connection." ); | |
IOT_SET_AND_GOTO_CLEANUP( false ); | |
} | |
else | |
{ | |
EMPTY_ELSE_MARKER; | |
} | |
/* Create the subscription mutex for a new connection. */ | |
subscriptionMutexCreated = IotMutex_Create( &( pMqttConnection->subscriptionMutex ), false ); | |
if( subscriptionMutexCreated == false ) | |
{ | |
IotLogError( "Failed to create subscription mutex for new connection." ); | |
IOT_SET_AND_GOTO_CLEANUP( false ); | |
} | |
else | |
{ | |
EMPTY_ELSE_MARKER; | |
} | |
/* Create the new connection's subscription and operation lists. */ | |
IotListDouble_Create( &( pMqttConnection->subscriptionList ) ); | |
IotListDouble_Create( &( pMqttConnection->pendingProcessing ) ); | |
IotListDouble_Create( &( pMqttConnection->pendingResponse ) ); | |
/* Check if keep-alive is active for this connection. */ | |
if( keepAliveSeconds != 0 ) | |
{ | |
if( _createKeepAliveOperation( pNetworkInfo, | |
keepAliveSeconds, | |
pMqttConnection ) == false ) | |
{ | |
IOT_SET_AND_GOTO_CLEANUP( false ); | |
} | |
else | |
{ | |
EMPTY_ELSE_MARKER; | |
} | |
} | |
else | |
{ | |
EMPTY_ELSE_MARKER; | |
} | |
/* Clean up mutexes and connection if this function failed. */ | |
IOT_FUNCTION_CLEANUP_BEGIN(); | |
if( status == false ) | |
{ | |
if( subscriptionMutexCreated == true ) | |
{ | |
IotMutex_Destroy( &( pMqttConnection->subscriptionMutex ) ); | |
} | |
else | |
{ | |
EMPTY_ELSE_MARKER; | |
} | |
if( referencesMutexCreated == true ) | |
{ | |
IotMutex_Destroy( &( pMqttConnection->referencesMutex ) ); | |
} | |
else | |
{ | |
EMPTY_ELSE_MARKER; | |
} | |
if( pMqttConnection != NULL ) | |
{ | |
IotMqtt_FreeConnection( pMqttConnection ); | |
pMqttConnection = NULL; | |
} | |
else | |
{ | |
EMPTY_ELSE_MARKER; | |
} | |
} | |
else | |
{ | |
EMPTY_ELSE_MARKER; | |
} | |
return pMqttConnection; | |
} | |
/*-----------------------------------------------------------*/ | |
static void _destroyMqttConnection( _mqttConnection_t * pMqttConnection ) | |
{ | |
IotNetworkError_t networkStatus = IOT_NETWORK_SUCCESS; | |
/* Clean up keep-alive if still allocated. */ | |
if( pMqttConnection->pingreq.u.operation.periodic.ping.keepAliveMs != 0 ) | |
{ | |
IotLogDebug( "(MQTT connection %p) Cleaning up keep-alive.", pMqttConnection ); | |
_getMqttFreePacketFunc( pMqttConnection->pSerializer )( pMqttConnection->pingreq.u.operation.pMqttPacket ); | |
/* Clear data about the keep-alive. */ | |
pMqttConnection->pingreq.u.operation.periodic.ping.keepAliveMs = 0; | |
pMqttConnection->pingreq.u.operation.pMqttPacket = NULL; | |
pMqttConnection->pingreq.u.operation.packetSize = 0; | |
/* Decrement reference count. */ | |
pMqttConnection->references--; | |
} | |
else | |
{ | |
EMPTY_ELSE_MARKER; | |
} | |
/* A connection to be destroyed should have no keep-alive and at most 1 | |
* reference. */ | |
IotMqtt_Assert( pMqttConnection->references <= 1 ); | |
IotMqtt_Assert( pMqttConnection->pingreq.u.operation.periodic.ping.keepAliveMs == 0 ); | |
IotMqtt_Assert( pMqttConnection->pingreq.u.operation.pMqttPacket == NULL ); | |
IotMqtt_Assert( pMqttConnection->pingreq.u.operation.packetSize == 0 ); | |
/* Remove all subscriptions. */ | |
IotMutex_Lock( &( pMqttConnection->subscriptionMutex ) ); | |
IotListDouble_RemoveAllMatches( &( pMqttConnection->subscriptionList ), | |
_mqttSubscription_setUnsubscribe, | |
NULL, | |
_mqttSubscription_tryDestroy, | |
offsetof( _mqttSubscription_t, link ) ); | |
IotMutex_Unlock( &( pMqttConnection->subscriptionMutex ) ); | |
/* Destroy an owned network connection. */ | |
if( pMqttConnection->ownNetworkConnection == true ) | |
{ | |
networkStatus = pMqttConnection->pNetworkInterface->destroy( pMqttConnection->pNetworkConnection ); | |
if( networkStatus != IOT_NETWORK_SUCCESS ) | |
{ | |
IotLogWarn( "(MQTT connection %p) Failed to destroy network connection.", | |
pMqttConnection ); | |
} | |
else | |
{ | |
IotLogInfo( "(MQTT connection %p) Network connection destroyed.", | |
pMqttConnection ); | |
} | |
} | |
else | |
{ | |
EMPTY_ELSE_MARKER; | |
} | |
/* Destroy mutexes. */ | |
IotMutex_Destroy( &( pMqttConnection->referencesMutex ) ); | |
IotMutex_Destroy( &( pMqttConnection->subscriptionMutex ) ); | |
IotLogDebug( "(MQTT connection %p) Connection destroyed.", pMqttConnection ); | |
/* Free connection. */ | |
IotMqtt_FreeConnection( pMqttConnection ); | |
} | |
/*-----------------------------------------------------------*/ | |
static IotMqttError_t _subscriptionCommonSetup( IotMqttOperationType_t operation, | |
IotMqttConnection_t mqttConnection, | |
const IotMqttSubscription_t * pSubscriptionList, | |
size_t subscriptionCount, | |
uint32_t flags, | |
IotMqttOperation_t * const pOperationReference ) | |
{ | |
IOT_FUNCTION_ENTRY( IotMqttError_t, IOT_MQTT_SUCCESS ); | |
/* This function should only be called for subscribe or unsubscribe. */ | |
IotMqtt_Assert( ( operation == IOT_MQTT_SUBSCRIBE ) || | |
( operation == IOT_MQTT_UNSUBSCRIBE ) ); | |
/* Check that IotMqtt_Init was called. */ | |
if( _checkInit() == false ) | |
{ | |
IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_NOT_INITIALIZED ); | |
} | |
else | |
{ | |
EMPTY_ELSE_MARKER; | |
} | |
/* Check that all elements in the subscription list are valid. */ | |
if( _IotMqtt_ValidateSubscriptionList( operation, | |
mqttConnection->awsIotMqttMode, | |
pSubscriptionList, | |
subscriptionCount ) == false ) | |
{ | |
IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_BAD_PARAMETER ); | |
} | |
else | |
{ | |
EMPTY_ELSE_MARKER; | |
} | |
/* Check that a reference pointer is provided for a waitable operation. */ | |
if( ( flags & IOT_MQTT_FLAG_WAITABLE ) == IOT_MQTT_FLAG_WAITABLE ) | |
{ | |
if( pOperationReference == NULL ) | |
{ | |
IotLogError( "Reference must be provided for a waitable %s.", | |
IotMqtt_OperationType( operation ) ); | |
IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_BAD_PARAMETER ); | |
} | |
else | |
{ | |
EMPTY_ELSE_MARKER; | |
} | |
} | |
else | |
{ | |
EMPTY_ELSE_MARKER; | |
} | |
IOT_FUNCTION_EXIT_NO_CLEANUP(); | |
} | |
/*-----------------------------------------------------------*/ | |
static IotMqttError_t _subscriptionCreateAndSerialize( IotMqttOperationType_t operation, | |
IotMqttConnection_t mqttConnection, | |
IotMqttSerializeSubscribe_t serializeSubscription, | |
const IotMqttSubscription_t * pSubscriptionList, | |
size_t subscriptionCount, | |
uint32_t flags, | |
const IotMqttCallbackInfo_t * pCallbackInfo, | |
_mqttOperation_t ** ppSubscriptionOperation ) | |
{ | |
IOT_FUNCTION_ENTRY( IotMqttError_t, IOT_MQTT_SUCCESS ); | |
_mqttOperation_t * pSubscriptionOperation = NULL; | |
/* Create a subscription operation. */ | |
status = _IotMqtt_CreateOperation( mqttConnection, | |
flags, | |
pCallbackInfo, | |
ppSubscriptionOperation ); | |
if( status != IOT_MQTT_SUCCESS ) | |
{ | |
IOT_GOTO_CLEANUP(); | |
} | |
else | |
{ | |
pSubscriptionOperation = ( *ppSubscriptionOperation ); | |
} | |
/* Check the subscription operation data and set the operation type. */ | |
IotMqtt_Assert( pSubscriptionOperation->u.operation.status == IOT_MQTT_STATUS_PENDING ); | |
IotMqtt_Assert( pSubscriptionOperation->u.operation.periodic.retry.limit == 0 ); | |
pSubscriptionOperation->u.operation.type = operation; | |
/* Generate a subscription packet from the subscription list. */ | |
status = serializeSubscription( pSubscriptionList, | |
subscriptionCount, | |
&( pSubscriptionOperation->u.operation.pMqttPacket ), | |
&( pSubscriptionOperation->u.operation.packetSize ), | |
&( pSubscriptionOperation->u.operation.packetIdentifier ) ); | |
if( status != IOT_MQTT_SUCCESS ) | |
{ | |
IOT_GOTO_CLEANUP(); | |
} | |
else | |
{ | |
EMPTY_ELSE_MARKER; | |
} | |
/* Check the serialized MQTT packet. */ | |
IotMqtt_Assert( pSubscriptionOperation->u.operation.pMqttPacket != NULL ); | |
IotMqtt_Assert( pSubscriptionOperation->u.operation.packetSize > 0 ); | |
IOT_FUNCTION_EXIT_NO_CLEANUP(); | |
} | |
/*-----------------------------------------------------------*/ | |
static IotMqttError_t _subscriptionCommon( IotMqttOperationType_t operation, | |
IotMqttConnection_t mqttConnection, | |
IotMqttSerializeSubscribe_t serializeSubscription, | |
const IotMqttSubscription_t * pSubscriptionList, | |
size_t subscriptionCount, | |
uint32_t flags, | |
const IotMqttCallbackInfo_t * pCallbackInfo, | |
IotMqttOperation_t * const pOperationReference ) | |
{ | |
IOT_FUNCTION_ENTRY( IotMqttError_t, IOT_MQTT_SUCCESS ); | |
_mqttOperation_t * pSubscriptionOperation = NULL; | |
/* Create and serialize the subscription operation. */ | |
status = _subscriptionCreateAndSerialize( operation, | |
mqttConnection, | |
serializeSubscription, | |
pSubscriptionList, | |
subscriptionCount, | |
flags, | |
pCallbackInfo, | |
&pSubscriptionOperation ); | |
if( status != IOT_MQTT_SUCCESS ) | |
{ | |
IOT_GOTO_CLEANUP(); | |
} | |
else | |
{ | |
EMPTY_ELSE_MARKER; | |
} | |
/* Add the subscription list for a SUBSCRIBE. */ | |
if( operation == IOT_MQTT_SUBSCRIBE ) | |
{ | |
status = _IotMqtt_AddSubscriptions( mqttConnection, | |
pSubscriptionOperation->u.operation.packetIdentifier, | |
pSubscriptionList, | |
subscriptionCount ); | |
if( status != IOT_MQTT_SUCCESS ) | |
{ | |
IOT_GOTO_CLEANUP(); | |
} | |
else | |
{ | |
EMPTY_ELSE_MARKER; | |
} | |
} | |
/* Set the reference, if provided. */ | |
if( pOperationReference != NULL ) | |
{ | |
*pOperationReference = pSubscriptionOperation; | |
} | |
else | |
{ | |
EMPTY_ELSE_MARKER; | |
} | |
/* Send the SUBSCRIBE packet. */ | |
if( ( flags & MQTT_INTERNAL_FLAG_BLOCK_ON_SEND ) == MQTT_INTERNAL_FLAG_BLOCK_ON_SEND ) | |
{ | |
_IotMqtt_ProcessSend( IOT_SYSTEM_TASKPOOL, pSubscriptionOperation->job, pSubscriptionOperation ); | |
} | |
else | |
{ | |
status = _IotMqtt_ScheduleOperation( pSubscriptionOperation, | |
_IotMqtt_ProcessSend, | |
0 ); | |
if( status != IOT_MQTT_SUCCESS ) | |
{ | |
IotLogError( "(MQTT connection %p) Failed to schedule %s for sending.", | |
mqttConnection, | |
IotMqtt_OperationType( operation ) ); | |
if( operation == IOT_MQTT_SUBSCRIBE ) | |
{ | |
_IotMqtt_RemoveSubscriptionByPacket( mqttConnection, | |
pSubscriptionOperation->u.operation.packetIdentifier, | |
MQTT_REMOVE_ALL_SUBSCRIPTIONS ); | |
} | |
else | |
{ | |
EMPTY_ELSE_MARKER; | |
} | |
/* Clear the previously set (and now invalid) reference. */ | |
if( pOperationReference != NULL ) | |
{ | |
*pOperationReference = IOT_MQTT_OPERATION_INITIALIZER; | |
} | |
else | |
{ | |
EMPTY_ELSE_MARKER; | |
} | |
IOT_GOTO_CLEANUP(); | |
} | |
} | |
/* Clean up if this function failed. */ | |
IOT_FUNCTION_CLEANUP_BEGIN(); | |
if( status != IOT_MQTT_SUCCESS ) | |
{ | |
if( pSubscriptionOperation != NULL ) | |
{ | |
_IotMqtt_DestroyOperation( pSubscriptionOperation ); | |
} | |
else | |
{ | |
EMPTY_ELSE_MARKER; | |
} | |
} | |
else | |
{ | |
status = IOT_MQTT_STATUS_PENDING; | |
IotLogInfo( "(MQTT connection %p) %s operation scheduled.", | |
mqttConnection, | |
IotMqtt_OperationType( operation ) ); | |
} | |
IOT_FUNCTION_CLEANUP_END(); | |
} | |
/*-----------------------------------------------------------*/ | |
bool _IotMqtt_IncrementConnectionReferences( _mqttConnection_t * pMqttConnection ) | |
{ | |
bool disconnected = false; | |
/* Lock the mutex protecting the reference count. */ | |
IotMutex_Lock( &( pMqttConnection->referencesMutex ) ); | |
/* Reference count must not be negative. */ | |
IotMqtt_Assert( pMqttConnection->references >= 0 ); | |
/* Read connection status. */ | |
disconnected = pMqttConnection->disconnected; | |
/* Increment the connection's reference count if it is not disconnected. */ | |
if( disconnected == false ) | |
{ | |
( pMqttConnection->references )++; | |
IotLogDebug( "(MQTT connection %p) Reference count changed from %ld to %ld.", | |
pMqttConnection, | |
( long int ) pMqttConnection->references - 1, | |
( long int ) pMqttConnection->references ); | |
} | |
else | |
{ | |
IotLogWarn( "(MQTT connection %p) Attempt to use closed connection.", pMqttConnection ); | |
} | |
IotMutex_Unlock( &( pMqttConnection->referencesMutex ) ); | |
return( disconnected == false ); | |
} | |
/*-----------------------------------------------------------*/ | |
void _IotMqtt_DecrementConnectionReferences( _mqttConnection_t * pMqttConnection ) | |
{ | |
bool destroyConnection = false; | |
/* Lock the mutex protecting the reference count. */ | |
IotMutex_Lock( &( pMqttConnection->referencesMutex ) ); | |
/* Decrement reference count. It must not be negative. */ | |
( pMqttConnection->references )--; | |
IotMqtt_Assert( pMqttConnection->references >= 0 ); | |
IotLogDebug( "(MQTT connection %p) Reference count changed from %ld to %ld.", | |
pMqttConnection, | |
( long int ) pMqttConnection->references + 1, | |
( long int ) pMqttConnection->references ); | |
/* Check if this connection may be destroyed. */ | |
if( pMqttConnection->references == 0 ) | |
{ | |
destroyConnection = true; | |
} | |
else | |
{ | |
EMPTY_ELSE_MARKER; | |
} | |
IotMutex_Unlock( &( pMqttConnection->referencesMutex ) ); | |
/* Destroy an unreferenced MQTT connection. */ | |
if( destroyConnection == true ) | |
{ | |
IotLogDebug( "(MQTT connection %p) Connection will be destroyed now.", | |
pMqttConnection ); | |
_destroyMqttConnection( pMqttConnection ); | |
} | |
else | |
{ | |
EMPTY_ELSE_MARKER; | |
} | |
} | |
/*-----------------------------------------------------------*/ | |
IotMqttError_t IotMqtt_Init( void ) | |
{ | |
IotMqttError_t status = IOT_MQTT_SUCCESS; | |
uint32_t allowInitialization = Atomic_CompareAndSwap_u32( &_initCalled, | |
MQTT_LIBRARY_INITIALIZED, | |
MQTT_LIBRARY_UNINITIALIZED ); | |
if( allowInitialization == 1 ) | |
{ | |
/* Call any additional serializer initialization function if serializer | |
* overrides are enabled. */ | |
#if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1 | |
#ifdef _IotMqtt_InitSerializeAdditional | |
if( _IotMqtt_InitSerializeAdditional() == false ) | |
{ | |
/* Log initialization status. */ | |
IotLogError( "Failed to initialize MQTT library serializer. " ); | |
status = IOT_MQTT_INIT_FAILED; | |
} | |
else | |
{ | |
EMPTY_ELSE_MARKER; | |
} | |
#endif /* ifdef _IotMqtt_InitSerializeAdditional */ | |
#endif /* if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1 */ | |
if( status == IOT_MQTT_SUCCESS ) | |
{ | |
IotLogInfo( "MQTT library successfully initialized." ); | |
} | |
else | |
{ | |
EMPTY_ELSE_MARKER; | |
} | |
} | |
else | |
{ | |
IotLogWarn( "IotMqtt_Init called with library already initialized." ); | |
} | |
return status; | |
} | |
/*-----------------------------------------------------------*/ | |
void IotMqtt_Cleanup( void ) | |
{ | |
uint32_t allowCleanup = Atomic_CompareAndSwap_u32( &_initCalled, | |
MQTT_LIBRARY_UNINITIALIZED, | |
MQTT_LIBRARY_INITIALIZED ); | |
if( allowCleanup == 1 ) | |
{ | |
/* Call any additional serializer cleanup initialization function if serializer | |
* overrides are enabled. */ | |
#if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1 | |
#ifdef _IotMqtt_CleanupSerializeAdditional | |
_IotMqtt_CleanupSerializeAdditional(); | |
#endif | |
#endif | |
IotLogInfo( "MQTT library cleanup done." ); | |
} | |
else | |
{ | |
IotLogWarn( "IotMqtt_Init was not called before IotMqtt_Cleanup." ); | |
} | |
} | |
/*-----------------------------------------------------------*/ | |
IotMqttError_t IotMqtt_Connect( const IotMqttNetworkInfo_t * pNetworkInfo, | |
const IotMqttConnectInfo_t * pConnectInfo, | |
uint32_t timeoutMs, | |
IotMqttConnection_t * const pMqttConnection ) | |
{ | |
IOT_FUNCTION_ENTRY( IotMqttError_t, IOT_MQTT_SUCCESS ); | |
bool ownNetworkConnection = false; | |
IotNetworkError_t networkStatus = IOT_NETWORK_SUCCESS; | |
IotTaskPoolError_t taskPoolStatus = IOT_TASKPOOL_SUCCESS; | |
IotNetworkConnection_t pNetworkConnection = { 0 }; | |
_mqttOperation_t * pOperation = NULL; | |
_mqttConnection_t * pNewMqttConnection = NULL; | |
/* Check that IotMqtt_Init was called. */ | |
if( _checkInit() == false ) | |
{ | |
IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_NOT_INITIALIZED ); | |
} | |
else | |
{ | |
EMPTY_ELSE_MARKER; | |
} | |
/* Validate network interface and connect info. */ | |
if( _IotMqtt_ValidateConnect( pConnectInfo ) == false ) | |
{ | |
IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_BAD_PARAMETER ); | |
} | |
else | |
{ | |
EMPTY_ELSE_MARKER; | |
} | |
networkStatus = _createNetworkConnection( pNetworkInfo, | |
&pNetworkConnection, | |
&ownNetworkConnection ); | |
if( networkStatus != IOT_NETWORK_SUCCESS ) | |
{ | |
IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_NETWORK_ERROR ); | |
} | |
else | |
{ | |
EMPTY_ELSE_MARKER; | |
} | |
IotLogInfo( "Establishing new MQTT connection." ); | |
/* Initialize a new MQTT connection object. */ | |
pNewMqttConnection = _createMqttConnection( pConnectInfo->awsIotMqttMode, | |
pNetworkInfo, | |
pConnectInfo->keepAliveSeconds ); | |
if( pNewMqttConnection == NULL ) | |
{ | |
IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_NO_MEMORY ); | |
} | |
else | |
{ | |
/* Set the network connection associated with the MQTT connection. */ | |
pNewMqttConnection->pNetworkConnection = pNetworkConnection; | |
pNewMqttConnection->ownNetworkConnection = ownNetworkConnection; | |
/* Set the MQTT packet serializer overrides. */ | |
#if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1 | |
pNewMqttConnection->pSerializer = pNetworkInfo->pMqttSerializer; | |
#else | |
pNewMqttConnection->pSerializer = NULL; | |
#endif | |
} | |
/* Set the MQTT receive callback. */ | |
networkStatus = pNewMqttConnection->pNetworkInterface->setReceiveCallback( pNetworkConnection, | |
IotMqtt_ReceiveCallback, | |
pNewMqttConnection ); | |
if( networkStatus != IOT_NETWORK_SUCCESS ) | |
{ | |
IotLogError( "Failed to set MQTT network receive callback." ); | |
IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_NETWORK_ERROR ); | |
} | |
else | |
{ | |
EMPTY_ELSE_MARKER; | |
} | |
/* Create a CONNECT operation. */ | |
status = _IotMqtt_CreateOperation( pNewMqttConnection, | |
IOT_MQTT_FLAG_WAITABLE, | |
NULL, | |
&pOperation ); | |
if( status != IOT_MQTT_SUCCESS ) | |
{ | |
IOT_GOTO_CLEANUP(); | |
} | |
else | |
{ | |
EMPTY_ELSE_MARKER; | |
} | |
/* Ensure the members set by operation creation and serialization | |
* are appropriate for a blocking CONNECT. */ | |
IotMqtt_Assert( pOperation->u.operation.status == IOT_MQTT_STATUS_PENDING ); | |
IotMqtt_Assert( ( pOperation->u.operation.flags & IOT_MQTT_FLAG_WAITABLE ) | |
== IOT_MQTT_FLAG_WAITABLE ); | |
IotMqtt_Assert( pOperation->u.operation.periodic.retry.limit == 0 ); | |
/* Set the operation type. */ | |
pOperation->u.operation.type = IOT_MQTT_CONNECT; | |
/* Add previous session subscriptions. */ | |
if( pConnectInfo->pPreviousSubscriptions != NULL ) | |
{ | |
/* Previous subscription count should have been validated as nonzero. */ | |
IotMqtt_Assert( pConnectInfo->previousSubscriptionCount > 0 ); | |
status = _IotMqtt_AddSubscriptions( pNewMqttConnection, | |
2, | |
pConnectInfo->pPreviousSubscriptions, | |
pConnectInfo->previousSubscriptionCount ); | |
if( status != IOT_MQTT_SUCCESS ) | |
{ | |
IOT_GOTO_CLEANUP(); | |
} | |
else | |
{ | |
EMPTY_ELSE_MARKER; | |
} | |
} | |
else | |
{ | |
EMPTY_ELSE_MARKER; | |
} | |
/* Convert the connect info and will info objects to an MQTT CONNECT packet. */ | |
status = _getMqttConnectSerializer( pNetworkInfo->pMqttSerializer )( pConnectInfo, | |
&( pOperation->u.operation.pMqttPacket ), | |
&( pOperation->u.operation.packetSize ) ); | |
if( status != IOT_MQTT_SUCCESS ) | |
{ | |
IOT_GOTO_CLEANUP(); | |
} | |
else | |
{ | |
EMPTY_ELSE_MARKER; | |
} | |
/* Check the serialized MQTT packet. */ | |
IotMqtt_Assert( pOperation->u.operation.pMqttPacket != NULL ); | |
IotMqtt_Assert( pOperation->u.operation.packetSize > 0 ); | |
/* Send the CONNECT packet. */ | |
_IotMqtt_ProcessSend( IOT_SYSTEM_TASKPOOL, pOperation->job, pOperation ); | |
/* Wait for the CONNECT operation to complete, i.e. wait for CONNACK. */ | |
status = IotMqtt_Wait( pOperation, timeoutMs ); | |
/* The call to wait cleans up the CONNECT operation, so set the pointer | |
* to NULL. */ | |
pOperation = NULL; | |
/* When a connection is successfully established, schedule keep-alive job. */ | |
if( status == IOT_MQTT_SUCCESS ) | |
{ | |
/* Check if a keep-alive job should be scheduled. */ | |
if( pNewMqttConnection->pingreq.u.operation.periodic.ping.keepAliveMs != 0 ) | |
{ | |
IotLogDebug( "Scheduling first MQTT keep-alive job." ); | |
taskPoolStatus = IotTaskPool_ScheduleDeferred( IOT_SYSTEM_TASKPOOL, | |
pNewMqttConnection->pingreq.job, | |
pNewMqttConnection->pingreq.u.operation.periodic.ping.nextPeriodMs ); | |
if( taskPoolStatus != IOT_TASKPOOL_SUCCESS ) | |
{ | |
IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_SCHEDULING_ERROR ); | |
} | |
else | |
{ | |
EMPTY_ELSE_MARKER; | |
} | |
} | |
else | |
{ | |
EMPTY_ELSE_MARKER; | |
} | |
} | |
else | |
{ | |
EMPTY_ELSE_MARKER; | |
} | |
IOT_FUNCTION_CLEANUP_BEGIN(); | |
if( status != IOT_MQTT_SUCCESS ) | |
{ | |
IotLogError( "Failed to establish new MQTT connection, error %s.", | |
IotMqtt_strerror( status ) ); | |
/* The network connection must be closed if it was created. */ | |
if( ownNetworkConnection == true ) | |
{ | |
networkStatus = pNetworkInfo->pNetworkInterface->close( pNetworkConnection ); | |
if( networkStatus != IOT_NETWORK_SUCCESS ) | |
{ | |
IotLogWarn( "Failed to close network connection." ); | |
} | |
else | |
{ | |
IotLogInfo( "Network connection closed on error." ); | |
} | |
} | |
else | |
{ | |
EMPTY_ELSE_MARKER; | |
} | |
if( pOperation != NULL ) | |
{ | |
_IotMqtt_DestroyOperation( pOperation ); | |
} | |
else | |
{ | |
EMPTY_ELSE_MARKER; | |
} | |
if( pNewMqttConnection != NULL ) | |
{ | |
_destroyMqttConnection( pNewMqttConnection ); | |
} | |
else | |
{ | |
EMPTY_ELSE_MARKER; | |
} | |
} | |
else | |
{ | |
IotLogInfo( "New MQTT connection %p established.", pMqttConnection ); | |
/* Set the output parameter. */ | |
*pMqttConnection = pNewMqttConnection; | |
} | |
IOT_FUNCTION_CLEANUP_END(); | |
} | |
/*-----------------------------------------------------------*/ | |
void IotMqtt_Disconnect( IotMqttConnection_t mqttConnection, | |
uint32_t flags ) | |
{ | |
bool disconnected = false, initCalled = false; | |
IotMqttError_t status = IOT_MQTT_STATUS_PENDING; | |
_mqttOperation_t * pOperation = NULL; | |
/* Check that IotMqtt_Init was called. */ | |
initCalled = _checkInit(); | |
if( initCalled == false ) | |
{ | |
IOT_GOTO_CLEANUP(); | |
} | |
else | |
{ | |
EMPTY_ELSE_MARKER; | |
} | |
/* Only send a DISCONNECT packet if the connection is active and the "cleanup only" | |
* flag is not set. */ | |
if( ( flags & IOT_MQTT_FLAG_CLEANUP_ONLY ) == IOT_MQTT_FLAG_CLEANUP_ONLY ) | |
{ | |
IOT_GOTO_CLEANUP(); | |
} | |
/* Read the connection status. */ | |
IotMutex_Lock( &( mqttConnection->referencesMutex ) ); | |
disconnected = mqttConnection->disconnected; | |
IotMutex_Unlock( &( mqttConnection->referencesMutex ) ); | |
if( disconnected == true ) | |
{ | |
IOT_GOTO_CLEANUP(); | |
} | |
IotLogInfo( "(MQTT connection %p) Disconnecting connection.", mqttConnection ); | |
/* Create a DISCONNECT operation. This function blocks until the DISCONNECT | |
* packet is sent, so it sets IOT_MQTT_FLAG_WAITABLE. */ | |
status = _IotMqtt_CreateOperation( mqttConnection, | |
IOT_MQTT_FLAG_WAITABLE, | |
NULL, | |
&pOperation ); | |
if( status == IOT_MQTT_SUCCESS ) | |
{ | |
/* Ensure that the members set by operation creation and serialization | |
* are appropriate for a blocking DISCONNECT. */ | |
IotMqtt_Assert( pOperation->u.operation.status == IOT_MQTT_STATUS_PENDING ); | |
IotMqtt_Assert( ( pOperation->u.operation.flags & IOT_MQTT_FLAG_WAITABLE ) | |
== IOT_MQTT_FLAG_WAITABLE ); | |
IotMqtt_Assert( pOperation->u.operation.periodic.retry.limit == 0 ); | |
/* Set the operation type. */ | |
pOperation->u.operation.type = IOT_MQTT_DISCONNECT; | |
/* Generate a DISCONNECT packet. */ | |
status = _getMqttDisconnectSerializer( mqttConnection->pSerializer )( &( pOperation->u.operation.pMqttPacket ), | |
&( pOperation->u.operation.packetSize ) ); | |
} | |
else | |
{ | |
EMPTY_ELSE_MARKER; | |
} | |
if( status == IOT_MQTT_SUCCESS ) | |
{ | |
/* Check the serialized MQTT packet. */ | |
IotMqtt_Assert( pOperation->u.operation.pMqttPacket != NULL ); | |
IotMqtt_Assert( pOperation->u.operation.packetSize > 0 ); | |
/* Send the DISCONNECT packet. */ | |
_IotMqtt_ProcessSend( IOT_SYSTEM_TASKPOOL, pOperation->job, pOperation ); | |
/* Wait a short time for the DISCONNECT packet to be transmitted. */ | |
status = IotMqtt_Wait( pOperation, | |
IOT_MQTT_RESPONSE_WAIT_MS ); | |
/* A wait on DISCONNECT should only ever return SUCCESS, TIMEOUT, | |
* or NETWORK ERROR. */ | |
if( status == IOT_MQTT_SUCCESS ) | |
{ | |
IotLogInfo( "(MQTT connection %p) Connection disconnected.", mqttConnection ); | |
} | |
else | |
{ | |
IotMqtt_Assert( ( status == IOT_MQTT_TIMEOUT ) || | |
( status == IOT_MQTT_NETWORK_ERROR ) ); | |
IotLogWarn( "(MQTT connection %p) DISCONNECT not sent, error %s.", | |
mqttConnection, | |
IotMqtt_strerror( status ) ); | |
} | |
} | |
else | |
{ | |
EMPTY_ELSE_MARKER; | |
} | |
/* This function has no return value and no cleanup, but uses the cleanup | |
* label to exit on error. */ | |
IOT_FUNCTION_CLEANUP_BEGIN(); | |
if( initCalled == true ) | |
{ | |
/* Close the underlying network connection. This also cleans up keep-alive. */ | |
_IotMqtt_CloseNetworkConnection( IOT_MQTT_DISCONNECT_CALLED, | |
mqttConnection ); | |
/* Check if the connection may be destroyed. */ | |
IotMutex_Lock( &( mqttConnection->referencesMutex ) ); | |
/* At this point, the connection should be marked disconnected. */ | |
IotMqtt_Assert( mqttConnection->disconnected == true ); | |
/* Attempt cancel and destroy each operation in the connection's lists. */ | |
IotListDouble_RemoveAll( &( mqttConnection->pendingProcessing ), | |
_mqttOperation_tryDestroy, | |
offsetof( _mqttOperation_t, link ) ); | |
IotListDouble_RemoveAll( &( mqttConnection->pendingResponse ), | |
_mqttOperation_tryDestroy, | |
offsetof( _mqttOperation_t, link ) ); | |
IotMutex_Unlock( &( mqttConnection->referencesMutex ) ); | |
/* Decrement the connection reference count and destroy it if possible. */ | |
_IotMqtt_DecrementConnectionReferences( mqttConnection ); | |
} | |
} | |
/*-----------------------------------------------------------*/ | |
IotMqttError_t IotMqtt_SubscribeAsync( IotMqttConnection_t mqttConnection, | |
const IotMqttSubscription_t * pSubscriptionList, | |
size_t subscriptionCount, | |
uint32_t flags, | |
const IotMqttCallbackInfo_t * pCallbackInfo, | |
IotMqttOperation_t * const pSubscribeOperation ) | |
{ | |
IotMqttError_t status = _subscriptionCommonSetup( IOT_MQTT_SUBSCRIBE, | |
mqttConnection, | |
pSubscriptionList, | |
subscriptionCount, | |
flags, | |
pSubscribeOperation ); | |
if( IOT_MQTT_SUCCESS == status ) | |
{ | |
status = _subscriptionCommon( IOT_MQTT_SUBSCRIBE, | |
mqttConnection, | |
_getMqttSubscribeSerializer( mqttConnection->pSerializer ), | |
pSubscriptionList, | |
subscriptionCount, | |
flags, | |
pCallbackInfo, | |
pSubscribeOperation ); | |
} | |
else | |
{ | |
EMPTY_ELSE_MARKER; | |
} | |
return status; | |
} | |
/*-----------------------------------------------------------*/ | |
IotMqttError_t IotMqtt_SubscribeSync( IotMqttConnection_t mqttConnection, | |
const IotMqttSubscription_t * pSubscriptionList, | |
size_t subscriptionCount, | |
uint32_t flags, | |
uint32_t timeoutMs ) | |
{ | |
IotMqttError_t status = IOT_MQTT_STATUS_PENDING; | |
IotMqttOperation_t subscribeOperation = IOT_MQTT_OPERATION_INITIALIZER; | |
/* Flags are not used, but the parameter is present for future compatibility. */ | |
( void ) flags; | |
/* Call the asynchronous SUBSCRIBE function. */ | |
status = IotMqtt_SubscribeAsync( mqttConnection, | |
pSubscriptionList, | |
subscriptionCount, | |
IOT_MQTT_FLAG_WAITABLE | MQTT_INTERNAL_FLAG_BLOCK_ON_SEND, | |
NULL, | |
&subscribeOperation ); | |
/* Wait for the SUBSCRIBE operation to complete. */ | |
if( status == IOT_MQTT_STATUS_PENDING ) | |
{ | |
status = IotMqtt_Wait( subscribeOperation, timeoutMs ); | |
} | |
else | |
{ | |
EMPTY_ELSE_MARKER; | |
} | |
/* Ensure that a status was set. */ | |
IotMqtt_Assert( status != IOT_MQTT_STATUS_PENDING ); | |
return status; | |
} | |
/*-----------------------------------------------------------*/ | |
IotMqttError_t IotMqtt_UnsubscribeAsync( IotMqttConnection_t mqttConnection, | |
const IotMqttSubscription_t * pSubscriptionList, | |
size_t subscriptionCount, | |
uint32_t flags, | |
const IotMqttCallbackInfo_t * pCallbackInfo, | |
IotMqttOperation_t * const pUnsubscribeOperation ) | |
{ | |
IotMqttError_t status = _subscriptionCommonSetup( IOT_MQTT_UNSUBSCRIBE, | |
mqttConnection, | |
pSubscriptionList, | |
subscriptionCount, | |
flags, | |
pUnsubscribeOperation ); | |
if( IOT_MQTT_SUCCESS == status ) | |
{ | |
/* Remove the MQTT subscription list for an UNSUBSCRIBE. */ | |
_IotMqtt_RemoveSubscriptionByTopicFilter( mqttConnection, | |
pSubscriptionList, | |
subscriptionCount ); | |
status = _subscriptionCommon( IOT_MQTT_UNSUBSCRIBE, | |
mqttConnection, | |
_getMqttUnsubscribeSerializer( mqttConnection->pSerializer ), | |
pSubscriptionList, | |
subscriptionCount, | |
flags, | |
pCallbackInfo, | |
pUnsubscribeOperation ); | |
} | |
else | |
{ | |
EMPTY_ELSE_MARKER; | |
} | |
return status; | |
} | |
/*-----------------------------------------------------------*/ | |
IotMqttError_t IotMqtt_UnsubscribeSync( IotMqttConnection_t mqttConnection, | |
const IotMqttSubscription_t * pSubscriptionList, | |
size_t subscriptionCount, | |
uint32_t flags, | |
uint32_t timeoutMs ) | |
{ | |
IotMqttError_t status = IOT_MQTT_STATUS_PENDING; | |
IotMqttOperation_t unsubscribeOperation = IOT_MQTT_OPERATION_INITIALIZER; | |
/* Flags are not used, but the parameter is present for future compatibility. */ | |
( void ) flags; | |
/* Call the asynchronous UNSUBSCRIBE function. */ | |
status = IotMqtt_UnsubscribeAsync( mqttConnection, | |
pSubscriptionList, | |
subscriptionCount, | |
IOT_MQTT_FLAG_WAITABLE | MQTT_INTERNAL_FLAG_BLOCK_ON_SEND, | |
NULL, | |
&unsubscribeOperation ); | |
/* Wait for the UNSUBSCRIBE operation to complete. */ | |
if( status == IOT_MQTT_STATUS_PENDING ) | |
{ | |
status = IotMqtt_Wait( unsubscribeOperation, timeoutMs ); | |
} | |
else | |
{ | |
EMPTY_ELSE_MARKER; | |
} | |
/* Ensure that a status was set. */ | |
IotMqtt_Assert( status != IOT_MQTT_STATUS_PENDING ); | |
return status; | |
} | |
/*-----------------------------------------------------------*/ | |
IotMqttError_t IotMqtt_PublishAsync( IotMqttConnection_t mqttConnection, | |
const IotMqttPublishInfo_t * pPublishInfo, | |
uint32_t flags, | |
const IotMqttCallbackInfo_t * pCallbackInfo, | |
IotMqttOperation_t * const pPublishOperation ) | |
{ | |
IOT_FUNCTION_ENTRY( IotMqttError_t, IOT_MQTT_SUCCESS ); | |
_mqttOperation_t * pOperation = NULL; | |
uint8_t ** pPacketIdentifierHigh = NULL; | |
/* Check that IotMqtt_Init was called. */ | |
if( _checkInit() == false ) | |
{ | |
IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_NOT_INITIALIZED ); | |
} | |
else | |
{ | |
EMPTY_ELSE_MARKER; | |
} | |
/* Check that the PUBLISH information is valid. */ | |
if( _IotMqtt_ValidatePublish( mqttConnection->awsIotMqttMode, | |
pPublishInfo ) == false ) | |
{ | |
IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_BAD_PARAMETER ); | |
} | |
else | |
{ | |
EMPTY_ELSE_MARKER; | |
} | |
/* Check that no notification is requested for a QoS 0 publish. */ | |
if( pPublishInfo->qos == IOT_MQTT_QOS_0 ) | |
{ | |
if( pCallbackInfo != NULL ) | |
{ | |
IotLogError( "QoS 0 PUBLISH should not have notification parameters set." ); | |
IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_BAD_PARAMETER ); | |
} | |
else if( ( flags & IOT_MQTT_FLAG_WAITABLE ) != 0 ) | |
{ | |
IotLogError( "QoS 0 PUBLISH should not have notification parameters set." ); | |
IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_BAD_PARAMETER ); | |
} | |
else | |
{ | |
EMPTY_ELSE_MARKER; | |
} | |
if( pPublishOperation != NULL ) | |
{ | |
IotLogWarn( "Ignoring reference parameter for QoS 0 publish." ); | |
} | |
else | |
{ | |
EMPTY_ELSE_MARKER; | |
} | |
} | |
else | |
{ | |
EMPTY_ELSE_MARKER; | |
} | |
/* Check that a reference pointer is provided for a waitable operation. */ | |
if( ( flags & IOT_MQTT_FLAG_WAITABLE ) == IOT_MQTT_FLAG_WAITABLE ) | |
{ | |
if( pPublishOperation == NULL ) | |
{ | |
IotLogError( "Reference must be provided for a waitable PUBLISH." ); | |
IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_BAD_PARAMETER ); | |
} | |
else | |
{ | |
EMPTY_ELSE_MARKER; | |
} | |
} | |
else | |
{ | |
EMPTY_ELSE_MARKER; | |
} | |
/* Create a PUBLISH operation. */ | |
status = _IotMqtt_CreateOperation( mqttConnection, | |
flags, | |
pCallbackInfo, | |
&pOperation ); | |
if( status != IOT_MQTT_SUCCESS ) | |
{ | |
IOT_GOTO_CLEANUP(); | |
} | |
else | |
{ | |
EMPTY_ELSE_MARKER; | |
} | |
/* Check the PUBLISH operation data and set the operation type. */ | |
IotMqtt_Assert( pOperation->u.operation.status == IOT_MQTT_STATUS_PENDING ); | |
pOperation->u.operation.type = IOT_MQTT_PUBLISH_TO_SERVER; | |
/* In AWS IoT MQTT mode, a pointer to the packet identifier must be saved. */ | |
if( mqttConnection->awsIotMqttMode == true ) | |
{ | |
pPacketIdentifierHigh = &( pOperation->u.operation.pPacketIdentifierHigh ); | |
} | |
else | |
{ | |
EMPTY_ELSE_MARKER; | |
} | |
/* Generate a PUBLISH packet from pPublishInfo. */ | |
status = _getMqttPublishSerializer( mqttConnection->pSerializer )( pPublishInfo, | |
&( pOperation->u.operation.pMqttPacket ), | |
&( pOperation->u.operation.packetSize ), | |
&( pOperation->u.operation.packetIdentifier ), | |
pPacketIdentifierHigh ); | |
if( status != IOT_MQTT_SUCCESS ) | |
{ | |
IOT_GOTO_CLEANUP(); | |
} | |
else | |
{ | |
EMPTY_ELSE_MARKER; | |
} | |
/* Check the serialized MQTT packet. */ | |
IotMqtt_Assert( pOperation->u.operation.pMqttPacket != NULL ); | |
IotMqtt_Assert( pOperation->u.operation.packetSize > 0 ); | |
/* Initialize PUBLISH retry if retryLimit is set. */ | |
if( pPublishInfo->retryLimit > 0 ) | |
{ | |
/* A QoS 0 PUBLISH may not be retried. */ | |
if( pPublishInfo->qos != IOT_MQTT_QOS_0 ) | |
{ | |
pOperation->u.operation.periodic.retry.limit = pPublishInfo->retryLimit; | |
pOperation->u.operation.periodic.retry.nextPeriodMs = pPublishInfo->retryMs; | |
} | |
else | |
{ | |
EMPTY_ELSE_MARKER; | |
} | |
} | |
else | |
{ | |
EMPTY_ELSE_MARKER; | |
} | |
/* Set the reference, if provided. */ | |
if( pPublishInfo->qos != IOT_MQTT_QOS_0 ) | |
{ | |
if( pPublishOperation != NULL ) | |
{ | |
*pPublishOperation = pOperation; | |
} | |
else | |
{ | |
EMPTY_ELSE_MARKER; | |
} | |
} | |
else | |
{ | |
EMPTY_ELSE_MARKER; | |
} | |
/* Send the PUBLISH packet. */ | |
if( ( flags & MQTT_INTERNAL_FLAG_BLOCK_ON_SEND ) == MQTT_INTERNAL_FLAG_BLOCK_ON_SEND ) | |
{ | |
_IotMqtt_ProcessSend( IOT_SYSTEM_TASKPOOL, pOperation->job, pOperation ); | |
} | |
else | |
{ | |
status = _IotMqtt_ScheduleOperation( pOperation, | |
_IotMqtt_ProcessSend, | |
0 ); | |
if( status != IOT_MQTT_SUCCESS ) | |
{ | |
IotLogError( "(MQTT connection %p) Failed to enqueue PUBLISH for sending.", | |
mqttConnection ); | |
/* Clear the previously set (and now invalid) reference. */ | |
if( pPublishInfo->qos != IOT_MQTT_QOS_0 ) | |
{ | |
if( pPublishOperation != NULL ) | |
{ | |
*pPublishOperation = IOT_MQTT_OPERATION_INITIALIZER; | |
} | |
else | |
{ | |
EMPTY_ELSE_MARKER; | |
} | |
} | |
else | |
{ | |
EMPTY_ELSE_MARKER; | |
} | |
IOT_GOTO_CLEANUP(); | |
} | |
else | |
{ | |
EMPTY_ELSE_MARKER; | |
} | |
} | |
/* Clean up the PUBLISH operation if this function fails. Otherwise, set the | |
* appropriate return code based on QoS. */ | |
IOT_FUNCTION_CLEANUP_BEGIN(); | |
if( status != IOT_MQTT_SUCCESS ) | |
{ | |
if( pOperation != NULL ) | |
{ | |
_IotMqtt_DestroyOperation( pOperation ); | |
} | |
else | |
{ | |
EMPTY_ELSE_MARKER; | |
} | |
} | |
else | |
{ | |
if( pPublishInfo->qos > IOT_MQTT_QOS_0 ) | |
{ | |
status = IOT_MQTT_STATUS_PENDING; | |
} | |
else | |
{ | |
EMPTY_ELSE_MARKER; | |
} | |
IotLogInfo( "(MQTT connection %p) MQTT PUBLISH operation queued.", | |
mqttConnection ); | |
} | |
IOT_FUNCTION_CLEANUP_END(); | |
} | |
/*-----------------------------------------------------------*/ | |
IotMqttError_t IotMqtt_PublishSync( IotMqttConnection_t mqttConnection, | |
const IotMqttPublishInfo_t * pPublishInfo, | |
uint32_t flags, | |
uint32_t timeoutMs ) | |
{ | |
IotMqttError_t status = IOT_MQTT_STATUS_PENDING; | |
IotMqttOperation_t publishOperation = IOT_MQTT_OPERATION_INITIALIZER, | |
* pPublishOperation = NULL; | |
/* Clear the flags, setting only the "serial" flag. */ | |
flags = MQTT_INTERNAL_FLAG_BLOCK_ON_SEND; | |
/* Set the waitable flag and reference for QoS 1 PUBLISH. */ | |
if( pPublishInfo->qos == IOT_MQTT_QOS_1 ) | |
{ | |
flags |= IOT_MQTT_FLAG_WAITABLE; | |
pPublishOperation = &publishOperation; | |
} | |
else | |
{ | |
EMPTY_ELSE_MARKER; | |
} | |
/* Call the asynchronous PUBLISH function. */ | |
status = IotMqtt_PublishAsync( mqttConnection, | |
pPublishInfo, | |
flags, | |
NULL, | |
pPublishOperation ); | |
/* Wait for a queued QoS 1 PUBLISH to complete. */ | |
if( pPublishInfo->qos == IOT_MQTT_QOS_1 ) | |
{ | |
if( status == IOT_MQTT_STATUS_PENDING ) | |
{ | |
status = IotMqtt_Wait( publishOperation, timeoutMs ); | |
} | |
else | |
{ | |
EMPTY_ELSE_MARKER; | |
} | |
} | |
else | |
{ | |
EMPTY_ELSE_MARKER; | |
} | |
return status; | |
} | |
/*-----------------------------------------------------------*/ | |
IotMqttError_t IotMqtt_Wait( IotMqttOperation_t operation, | |
uint32_t timeoutMs ) | |
{ | |
IotMqttError_t status = IOT_MQTT_SUCCESS; | |
_mqttConnection_t * pMqttConnection = NULL; | |
/* Check that IotMqtt_Init was called. */ | |
if( _checkInit() == false ) | |
{ | |
IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_NOT_INITIALIZED ); | |
} | |
else | |
{ | |
EMPTY_ELSE_MARKER; | |
} | |
/* Validate the given operation reference. */ | |
if( _IotMqtt_ValidateOperation( operation ) == false ) | |
{ | |
status = IOT_MQTT_BAD_PARAMETER; | |
} | |
else | |
{ | |
EMPTY_ELSE_MARKER; | |
} | |
/* Check the MQTT connection status. */ | |
pMqttConnection = operation->pMqttConnection; | |
if( status == IOT_MQTT_SUCCESS ) | |
{ | |
IotMutex_Lock( &( pMqttConnection->referencesMutex ) ); | |
if( pMqttConnection->disconnected == true ) | |
{ | |
IotLogError( "(MQTT connection %p, %s operation %p) MQTT connection is closed. " | |
"Operation cannot be waited on.", | |
pMqttConnection, | |
IotMqtt_OperationType( operation->u.operation.type ), | |
operation ); | |
status = IOT_MQTT_NETWORK_ERROR; | |
} | |
else | |
{ | |
IotLogInfo( "(MQTT connection %p, %s operation %p) Waiting for operation completion.", | |
pMqttConnection, | |
IotMqtt_OperationType( operation->u.operation.type ), | |
operation ); | |
} | |
IotMutex_Unlock( &( pMqttConnection->referencesMutex ) ); | |
/* Only wait on an operation if the MQTT connection is active. */ | |
if( status == IOT_MQTT_SUCCESS ) | |
{ | |
if( IotSemaphore_TimedWait( &( operation->u.operation.notify.waitSemaphore ), | |
timeoutMs ) == false ) | |
{ | |
status = IOT_MQTT_TIMEOUT; | |
/* Attempt to cancel the job of the timed out operation. */ | |
( void ) _IotMqtt_DecrementOperationReferences( operation, true ); | |
/* Clean up lingering subscriptions from a timed-out SUBSCRIBE. */ | |
if( operation->u.operation.type == IOT_MQTT_SUBSCRIBE ) | |
{ | |
IotLogDebug( "(MQTT connection %p, SUBSCRIBE operation %p) Cleaning up" | |
" subscriptions of timed-out SUBSCRIBE.", | |
pMqttConnection, | |
operation ); | |
_IotMqtt_RemoveSubscriptionByPacket( pMqttConnection, | |
operation->u.operation.packetIdentifier, | |
MQTT_REMOVE_ALL_SUBSCRIPTIONS ); | |
} | |
else | |
{ | |
EMPTY_ELSE_MARKER; | |
} | |
} | |
else | |
{ | |
/* Retrieve the status of the completed operation. */ | |
status = operation->u.operation.status; | |
} | |
IotLogInfo( "(MQTT connection %p, %s operation %p) Wait complete with result %s.", | |
pMqttConnection, | |
IotMqtt_OperationType( operation->u.operation.type ), | |
operation, | |
IotMqtt_strerror( status ) ); | |
} | |
else | |
{ | |
EMPTY_ELSE_MARKER; | |
} | |
/* Wait is finished; decrement operation reference count. */ | |
if( _IotMqtt_DecrementOperationReferences( operation, false ) == true ) | |
{ | |
_IotMqtt_DestroyOperation( operation ); | |
} | |
else | |
{ | |
EMPTY_ELSE_MARKER; | |
} | |
} | |
else | |
{ | |
EMPTY_ELSE_MARKER; | |
} | |
IOT_FUNCTION_EXIT_NO_CLEANUP(); | |
} | |
/*-----------------------------------------------------------*/ | |
const char * IotMqtt_strerror( IotMqttError_t status ) | |
{ | |
const char * pMessage = NULL; | |
switch( status ) | |
{ | |
case IOT_MQTT_SUCCESS: | |
pMessage = "SUCCESS"; | |
break; | |
case IOT_MQTT_STATUS_PENDING: | |
pMessage = "PENDING"; | |
break; | |
case IOT_MQTT_INIT_FAILED: | |
pMessage = "INITIALIZATION FAILED"; | |
break; | |
case IOT_MQTT_BAD_PARAMETER: | |
pMessage = "BAD PARAMETER"; | |
break; | |
case IOT_MQTT_NO_MEMORY: | |
pMessage = "NO MEMORY"; | |
break; | |
case IOT_MQTT_NETWORK_ERROR: | |
pMessage = "NETWORK ERROR"; | |
break; | |
case IOT_MQTT_SCHEDULING_ERROR: | |
pMessage = "SCHEDULING ERROR"; | |
break; | |
case IOT_MQTT_BAD_RESPONSE: | |
pMessage = "BAD RESPONSE RECEIVED"; | |
break; | |
case IOT_MQTT_TIMEOUT: | |
pMessage = "TIMEOUT"; | |
break; | |
case IOT_MQTT_SERVER_REFUSED: | |
pMessage = "SERVER REFUSED"; | |
break; | |
case IOT_MQTT_RETRY_NO_RESPONSE: | |
pMessage = "NO RESPONSE"; | |
break; | |
case IOT_MQTT_NOT_INITIALIZED: | |
pMessage = "NOT INITIALIZED"; | |
break; | |
default: | |
pMessage = "INVALID STATUS"; | |
break; | |
} | |
return pMessage; | |
} | |
/*-----------------------------------------------------------*/ | |
const char * IotMqtt_OperationType( IotMqttOperationType_t operation ) | |
{ | |
const char * pMessage = NULL; | |
switch( operation ) | |
{ | |
case IOT_MQTT_CONNECT: | |
pMessage = "CONNECT"; | |
break; | |
case IOT_MQTT_PUBLISH_TO_SERVER: | |
pMessage = "PUBLISH"; | |
break; | |
case IOT_MQTT_PUBACK: | |
pMessage = "PUBACK"; | |
break; | |
case IOT_MQTT_SUBSCRIBE: | |
pMessage = "SUBSCRIBE"; | |
break; | |
case IOT_MQTT_UNSUBSCRIBE: | |
pMessage = "UNSUBSCRIBE"; | |
break; | |
case IOT_MQTT_PINGREQ: | |
pMessage = "PINGREQ"; | |
break; | |
case IOT_MQTT_DISCONNECT: | |
pMessage = "DISCONNECT"; | |
break; | |
default: | |
pMessage = "INVALID OPERATION"; | |
break; | |
} | |
return pMessage; | |
} | |
/*-----------------------------------------------------------*/ | |
/* Provide access to internal functions and variables if testing. */ | |
#if IOT_BUILD_TESTS == 1 | |
#include "iot_test_access_mqtt_api.c" | |
#endif |