| /* | |
| * IoT MQTT V2.1.0 | |
| * Copyright (C) 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. | |
| * | |
| * Permission is hereby granted, free of charge, to any person obtaining a copy of | |
| * this software and associated documentation files (the "Software"), to deal in | |
| * the Software without restriction, including without limitation the rights to | |
| * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of | |
| * the Software, and to permit persons to whom the Software is furnished to do so, | |
| * subject to the following conditions: | |
| * | |
| * The above copyright notice and this permission notice shall be included in all | |
| * copies or substantial portions of the Software. | |
| * | |
| * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR | |
| * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS | |
| * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR | |
| * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER | |
| * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN | |
| * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. | |
| */ | |
| /** | |
| * @file iot_mqtt_network.c | |
| * @brief Implements functions involving transport layer connections. | |
| */ | |
| /* The config header is always included first. */ | |
| #include "iot_config.h" | |
| /* Standard includes. */ | |
| #include <string.h> | |
| /* Error handling include. */ | |
| #include "iot_error.h" | |
| /* MQTT internal include. */ | |
| #include "private/iot_mqtt_internal.h" | |
| /* Platform layer includes. */ | |
| #include "platform/iot_threads.h" | |
| /* Atomics include. */ | |
| #include "iot_atomic.h" | |
| /*-----------------------------------------------------------*/ | |
| /** | |
| * @brief Check if an incoming packet type is valid. | |
| * | |
| * @param[in] packetType The packet type to check. | |
| * | |
| * @return `true` if the packet type is valid; `false` otherwise. | |
| */ | |
| static bool _incomingPacketValid( uint8_t packetType ); | |
| /** | |
| * @brief Get an incoming MQTT packet from the network. | |
| * | |
| * @param[in] pNetworkConnection Network connection to use for receive, which | |
| * may be different from the network connection associated with the MQTT connection. | |
| * @param[in] pMqttConnection The associated MQTT connection. | |
| * @param[out] pIncomingPacket Output parameter for the incoming packet. | |
| * | |
| * @return #IOT_MQTT_SUCCESS, #IOT_MQTT_NO_MEMORY or #IOT_MQTT_BAD_RESPONSE. | |
| */ | |
| static IotMqttError_t _getIncomingPacket( void * pNetworkConnection, | |
| const _mqttConnection_t * pMqttConnection, | |
| _mqttPacket_t * pIncomingPacket ); | |
| /** | |
| * @brief Deserialize a packet received from the network. | |
| * | |
| * @param[in] pMqttConnection The associated MQTT connection. | |
| * @param[in] pIncomingPacket The packet received from the network. | |
| * | |
| * @return #IOT_MQTT_SUCCESS, #IOT_MQTT_NO_MEMORY, #IOT_MQTT_NETWORK_ERROR, | |
| * #IOT_MQTT_SCHEDULING_ERROR, #IOT_MQTT_BAD_RESPONSE, or #IOT_MQTT_SERVER_REFUSED. | |
| */ | |
| static IotMqttError_t _deserializeIncomingPacket( _mqttConnection_t * pMqttConnection, | |
| _mqttPacket_t * pIncomingPacket ); | |
| /** | |
| * @brief Send a PUBACK for a received QoS 1 PUBLISH packet. | |
| * | |
| * @param[in] pMqttConnection Which connection the PUBACK should be sent over. | |
| * @param[in] packetIdentifier Which packet identifier to include in PUBACK. | |
| */ | |
| static void _sendPuback( _mqttConnection_t * pMqttConnection, | |
| uint16_t packetIdentifier ); | |
| /** | |
| * @brief Flush a packet from the stream of incoming data. | |
| * | |
| * This function is called when memory for a packet cannot be allocated. The | |
| * packet is flushed from the stream of incoming data so that the next packet | |
| * may be read. | |
| * | |
| * @param[in] pNetworkConnection Network connection to use for receive, which | |
| * may be different from the network connection associated with the MQTT connection. | |
| * @param[in] pMqttConnection The associated MQTT connection. | |
| * @param[in] length The length of the packet to flush. | |
| */ | |
| static void _flushPacket( void * pNetworkConnection, | |
| const _mqttConnection_t * pMqttConnection, | |
| size_t length ); | |
| /** | |
| * @cond DOXYGEN_IGNORE | |
| * Doxygen should ignore this section. | |
| * | |
| * Declaration of local MQTT serializer override selectors | |
| */ | |
| #if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1 | |
| _SERIALIZER_OVERRIDE_SELECTOR( IotMqttGetPacketType_t, | |
| _getPacketTypeFunc, | |
| _IotMqtt_GetPacketType, | |
| getPacketType ) | |
| _SERIALIZER_OVERRIDE_SELECTOR( IotMqttGetRemainingLength_t, | |
| _getRemainingLengthFunc, | |
| _IotMqtt_GetRemainingLength, | |
| getRemainingLength ) | |
| _SERIALIZER_OVERRIDE_SELECTOR( IotMqttDeserialize_t, | |
| _getConnackDeserializer, | |
| _IotMqtt_DeserializeConnack, | |
| deserialize.connack ) | |
| _SERIALIZER_OVERRIDE_SELECTOR( IotMqttDeserialize_t, | |
| _getPublishDeserializer, | |
| _IotMqtt_DeserializePublish, | |
| deserialize.publish ) | |
| _SERIALIZER_OVERRIDE_SELECTOR( IotMqttDeserialize_t, | |
| _getPubackDeserializer, | |
| _IotMqtt_DeserializePuback, | |
| deserialize.puback ) | |
| _SERIALIZER_OVERRIDE_SELECTOR( IotMqttDeserialize_t, | |
| _getSubackDeserializer, | |
| _IotMqtt_DeserializeSuback, | |
| deserialize.suback ) | |
| _SERIALIZER_OVERRIDE_SELECTOR( IotMqttDeserialize_t, | |
| _getUnsubackDeserializer, | |
| _IotMqtt_DeserializeUnsuback, | |
| deserialize.unsuback ) | |
| _SERIALIZER_OVERRIDE_SELECTOR( IotMqttDeserialize_t, | |
| _getPingrespDeserializer, | |
| _IotMqtt_DeserializePingresp, | |
| deserialize.pingresp ) | |
| _SERIALIZER_OVERRIDE_SELECTOR( IotMqttSerializePuback_t, | |
| _getMqttPubackSerializer, | |
| _IotMqtt_SerializePuback, | |
| serialize.puback ) | |
| _SERIALIZER_OVERRIDE_SELECTOR( IotMqttFreePacket_t, | |
| _getMqttFreePacketFunc, | |
| _IotMqtt_FreePacket, | |
| freePacket ) | |
| #else /* if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1 */ | |
| #define _getPacketTypeFunc( pSerializer ) _IotMqtt_GetPacketType | |
| #define _getRemainingLengthFunc( pSerializer ) _IotMqtt_GetRemainingLength | |
| #define _getConnackDeserializer( pSerializer ) _IotMqtt_DeserializeConnack | |
| #define _getPublishDeserializer( pSerializer ) _IotMqtt_DeserializePublish | |
| #define _getPubackDeserializer( pSerializer ) _IotMqtt_DeserializePuback | |
| #define _getSubackDeserializer( pSerializer ) _IotMqtt_DeserializeSuback | |
| #define _getUnsubackDeserializer( pSerializer ) _IotMqtt_DeserializeUnsuback | |
| #define _getPingrespDeserializer( pSerializer ) _IotMqtt_DeserializePingresp | |
| #define _getMqttPubackSerializer( pSerializer ) _IotMqtt_SerializePuback | |
| #define _getMqttFreePacketFunc( pSerializer ) _IotMqtt_FreePacket | |
| #endif /* if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1 */ | |
| /** @endcond */ | |
| /*-----------------------------------------------------------*/ | |
| static bool _incomingPacketValid( uint8_t packetType ) | |
| { | |
| bool status = true; | |
| /* Check packet type. Mask out lower bits to ignore flags. */ | |
| switch( packetType & 0xf0 ) | |
| { | |
| /* Valid incoming packet types. */ | |
| case MQTT_PACKET_TYPE_CONNACK: | |
| case MQTT_PACKET_TYPE_PUBLISH: | |
| case MQTT_PACKET_TYPE_PUBACK: | |
| case MQTT_PACKET_TYPE_SUBACK: | |
| case MQTT_PACKET_TYPE_UNSUBACK: | |
| case MQTT_PACKET_TYPE_PINGRESP: | |
| break; | |
| /* Any other packet type is invalid. */ | |
| default: | |
| status = false; | |
| break; | |
| } | |
| return status; | |
| } | |
| /*-----------------------------------------------------------*/ | |
| static IotMqttError_t _getIncomingPacket( void * pNetworkConnection, | |
| const _mqttConnection_t * pMqttConnection, | |
| _mqttPacket_t * pIncomingPacket ) | |
| { | |
| IOT_FUNCTION_ENTRY( IotMqttError_t, IOT_MQTT_SUCCESS ); | |
| size_t dataBytesRead = 0; | |
| /* No buffer for remaining data should be allocated. */ | |
| IotMqtt_Assert( pIncomingPacket->pRemainingData == NULL ); | |
| IotMqtt_Assert( pIncomingPacket->remainingLength == 0 ); | |
| /* Read the packet type, which is the first byte available. */ | |
| pIncomingPacket->type = _getPacketTypeFunc( pMqttConnection->pSerializer )( pNetworkConnection, | |
| pMqttConnection->pNetworkInterface ); | |
| /* Check that the incoming packet type is valid. */ | |
| if( _incomingPacketValid( pIncomingPacket->type ) == false ) | |
| { | |
| IotLogError( "(MQTT connection %p) Unknown packet type %02x received.", | |
| pMqttConnection, | |
| pIncomingPacket->type ); | |
| IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_BAD_RESPONSE ); | |
| } | |
| else | |
| { | |
| EMPTY_ELSE_MARKER; | |
| } | |
| /* Read the remaining length. */ | |
| pIncomingPacket->remainingLength = _getRemainingLengthFunc( pMqttConnection->pSerializer )( pNetworkConnection, | |
| pMqttConnection->pNetworkInterface ); | |
| if( pIncomingPacket->remainingLength == MQTT_REMAINING_LENGTH_INVALID ) | |
| { | |
| IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_BAD_RESPONSE ); | |
| } | |
| else | |
| { | |
| EMPTY_ELSE_MARKER; | |
| } | |
| /* Allocate a buffer for the remaining data and read the data. */ | |
| if( pIncomingPacket->remainingLength > 0 ) | |
| { | |
| pIncomingPacket->pRemainingData = IotMqtt_MallocMessage( pIncomingPacket->remainingLength ); | |
| if( pIncomingPacket->pRemainingData == NULL ) | |
| { | |
| IotLogError( "(MQTT connection %p) Failed to allocate buffer of length " | |
| "%lu for incoming packet type %lu.", | |
| pMqttConnection, | |
| ( unsigned long ) pIncomingPacket->remainingLength, | |
| ( unsigned long ) pIncomingPacket->type ); | |
| _flushPacket( pNetworkConnection, pMqttConnection, pIncomingPacket->remainingLength ); | |
| IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_NO_MEMORY ); | |
| } | |
| else | |
| { | |
| EMPTY_ELSE_MARKER; | |
| } | |
| dataBytesRead = pMqttConnection->pNetworkInterface->receive( pNetworkConnection, | |
| pIncomingPacket->pRemainingData, | |
| pIncomingPacket->remainingLength ); | |
| if( dataBytesRead != pIncomingPacket->remainingLength ) | |
| { | |
| IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_BAD_RESPONSE ); | |
| } | |
| else | |
| { | |
| EMPTY_ELSE_MARKER; | |
| } | |
| } | |
| else | |
| { | |
| EMPTY_ELSE_MARKER; | |
| } | |
| /* Clean up on error. */ | |
| IOT_FUNCTION_CLEANUP_BEGIN(); | |
| if( status != IOT_MQTT_SUCCESS ) | |
| { | |
| if( pIncomingPacket->pRemainingData != NULL ) | |
| { | |
| IotMqtt_FreeMessage( pIncomingPacket->pRemainingData ); | |
| } | |
| else | |
| { | |
| EMPTY_ELSE_MARKER; | |
| } | |
| } | |
| else | |
| { | |
| EMPTY_ELSE_MARKER; | |
| } | |
| IOT_FUNCTION_CLEANUP_END(); | |
| } | |
| /*-----------------------------------------------------------*/ | |
| static IotMqttError_t _deserializeIncomingPacket( _mqttConnection_t * pMqttConnection, | |
| _mqttPacket_t * pIncomingPacket ) | |
| { | |
| IotMqttError_t status = IOT_MQTT_STATUS_PENDING; | |
| _mqttOperation_t * pOperation = NULL; | |
| /* A buffer for remaining data must be allocated if remaining length is not 0. */ | |
| IotMqtt_Assert( ( pIncomingPacket->remainingLength > 0 ) == | |
| ( pIncomingPacket->pRemainingData != NULL ) ); | |
| /* Only valid packets should be given to this function. */ | |
| IotMqtt_Assert( _incomingPacketValid( pIncomingPacket->type ) == true ); | |
| /* Mask out the low bits of packet type to ignore flags. */ | |
| switch( ( pIncomingPacket->type & 0xf0 ) ) | |
| { | |
| case MQTT_PACKET_TYPE_CONNACK: | |
| IotLogDebug( "(MQTT connection %p) CONNACK in data stream.", pMqttConnection ); | |
| /* Deserialize CONNACK and notify of result. */ | |
| status = _getConnackDeserializer( pMqttConnection->pSerializer )( pIncomingPacket ); | |
| pOperation = _IotMqtt_FindOperation( pMqttConnection, | |
| IOT_MQTT_CONNECT, | |
| NULL ); | |
| if( pOperation != NULL ) | |
| { | |
| pOperation->u.operation.status = status; | |
| _IotMqtt_Notify( pOperation ); | |
| } | |
| else | |
| { | |
| EMPTY_ELSE_MARKER; | |
| } | |
| break; | |
| case MQTT_PACKET_TYPE_PUBLISH: | |
| IotLogDebug( "(MQTT connection %p) PUBLISH in data stream.", pMqttConnection ); | |
| /* Allocate memory to handle the incoming PUBLISH. */ | |
| pOperation = IotMqtt_MallocOperation( sizeof( _mqttOperation_t ) ); | |
| if( pOperation == NULL ) | |
| { | |
| IotLogWarn( "Failed to allocate memory for incoming PUBLISH." ); | |
| status = IOT_MQTT_NO_MEMORY; | |
| break; | |
| } | |
| else | |
| { | |
| /* Set the members of the incoming PUBLISH operation. */ | |
| ( void ) memset( pOperation, 0x00, sizeof( _mqttOperation_t ) ); | |
| pOperation->incomingPublish = true; | |
| pOperation->pMqttConnection = pMqttConnection; | |
| pIncomingPacket->u.pIncomingPublish = pOperation; | |
| } | |
| /* Deserialize incoming PUBLISH. */ | |
| status = _getPublishDeserializer( pMqttConnection->pSerializer )( pIncomingPacket ); | |
| if( status == IOT_MQTT_SUCCESS ) | |
| { | |
| /* Send a PUBACK for QoS 1 PUBLISH. */ | |
| if( pOperation->u.publish.publishInfo.qos == IOT_MQTT_QOS_1 ) | |
| { | |
| _sendPuback( pMqttConnection, pIncomingPacket->packetIdentifier ); | |
| } | |
| else | |
| { | |
| EMPTY_ELSE_MARKER; | |
| } | |
| /* Transfer ownership of the received MQTT packet to the PUBLISH operation. */ | |
| pOperation->u.publish.pReceivedData = pIncomingPacket->pRemainingData; | |
| pIncomingPacket->pRemainingData = NULL; | |
| /* Add the PUBLISH to the list of operations pending processing. */ | |
| IotMutex_Lock( &( pMqttConnection->referencesMutex ) ); | |
| IotListDouble_InsertHead( &( pMqttConnection->pendingProcessing ), | |
| &( pOperation->link ) ); | |
| IotMutex_Unlock( &( pMqttConnection->referencesMutex ) ); | |
| /* Increment the MQTT connection reference count before scheduling an | |
| * incoming PUBLISH. */ | |
| if( _IotMqtt_IncrementConnectionReferences( pMqttConnection ) == true ) | |
| { | |
| /* Schedule PUBLISH for callback invocation. */ | |
| status = _IotMqtt_ScheduleOperation( pOperation, _IotMqtt_ProcessIncomingPublish, 0 ); | |
| } | |
| else | |
| { | |
| status = IOT_MQTT_NETWORK_ERROR; | |
| } | |
| } | |
| else | |
| { | |
| EMPTY_ELSE_MARKER; | |
| } | |
| /* Free PUBLISH operation on error. */ | |
| if( status != IOT_MQTT_SUCCESS ) | |
| { | |
| /* Check ownership of the received MQTT packet. */ | |
| if( pOperation->u.publish.pReceivedData != NULL ) | |
| { | |
| /* Retrieve the pointer MQTT packet pointer so it may be freed later. */ | |
| IotMqtt_Assert( pIncomingPacket->pRemainingData == NULL ); | |
| pIncomingPacket->pRemainingData = ( uint8_t * ) pOperation->u.publish.pReceivedData; | |
| } | |
| else | |
| { | |
| EMPTY_ELSE_MARKER; | |
| } | |
| /* Remove operation from pending processing list. */ | |
| IotMutex_Lock( &( pMqttConnection->referencesMutex ) ); | |
| if( IotLink_IsLinked( &( pOperation->link ) ) == true ) | |
| { | |
| IotListDouble_Remove( &( pOperation->link ) ); | |
| } | |
| else | |
| { | |
| EMPTY_ELSE_MARKER; | |
| } | |
| IotMutex_Unlock( &( pMqttConnection->referencesMutex ) ); | |
| IotMqtt_Assert( pOperation != NULL ); | |
| IotMqtt_FreeOperation( pOperation ); | |
| } | |
| else | |
| { | |
| EMPTY_ELSE_MARKER; | |
| } | |
| break; | |
| case MQTT_PACKET_TYPE_PUBACK: | |
| IotLogDebug( "(MQTT connection %p) PUBACK in data stream.", pMqttConnection ); | |
| /* Deserialize PUBACK and notify of result. */ | |
| status = _getPubackDeserializer( pMqttConnection->pSerializer )( pIncomingPacket ); | |
| pOperation = _IotMqtt_FindOperation( pMqttConnection, | |
| IOT_MQTT_PUBLISH_TO_SERVER, | |
| &( pIncomingPacket->packetIdentifier ) ); | |
| if( pOperation != NULL ) | |
| { | |
| pOperation->u.operation.status = status; | |
| _IotMqtt_Notify( pOperation ); | |
| } | |
| else | |
| { | |
| EMPTY_ELSE_MARKER; | |
| } | |
| break; | |
| case MQTT_PACKET_TYPE_SUBACK: | |
| IotLogDebug( "(MQTT connection %p) SUBACK in data stream.", pMqttConnection ); | |
| /* Deserialize SUBACK and notify of result. */ | |
| pIncomingPacket->u.pMqttConnection = pMqttConnection; | |
| status = _getSubackDeserializer( pMqttConnection->pSerializer )( pIncomingPacket ); | |
| pOperation = _IotMqtt_FindOperation( pMqttConnection, | |
| IOT_MQTT_SUBSCRIBE, | |
| &( pIncomingPacket->packetIdentifier ) ); | |
| if( pOperation != NULL ) | |
| { | |
| pOperation->u.operation.status = status; | |
| _IotMqtt_Notify( pOperation ); | |
| } | |
| else | |
| { | |
| EMPTY_ELSE_MARKER; | |
| } | |
| break; | |
| case MQTT_PACKET_TYPE_UNSUBACK: | |
| IotLogDebug( "(MQTT connection %p) UNSUBACK in data stream.", pMqttConnection ); | |
| /* Deserialize UNSUBACK and notify of result. */ | |
| status = _getUnsubackDeserializer( pMqttConnection->pSerializer )( pIncomingPacket ); | |
| pOperation = _IotMqtt_FindOperation( pMqttConnection, | |
| IOT_MQTT_UNSUBSCRIBE, | |
| &( pIncomingPacket->packetIdentifier ) ); | |
| if( pOperation != NULL ) | |
| { | |
| pOperation->u.operation.status = status; | |
| _IotMqtt_Notify( pOperation ); | |
| } | |
| else | |
| { | |
| EMPTY_ELSE_MARKER; | |
| } | |
| break; | |
| default: | |
| /* The only remaining valid type is PINGRESP. */ | |
| IotMqtt_Assert( ( pIncomingPacket->type & 0xf0 ) == MQTT_PACKET_TYPE_PINGRESP ); | |
| IotLogDebug( "(MQTT connection %p) PINGRESP in data stream.", pMqttConnection ); | |
| /* Deserialize PINGRESP. */ | |
| status = _getPingrespDeserializer( pMqttConnection->pSerializer )( pIncomingPacket ); | |
| if( status == IOT_MQTT_SUCCESS ) | |
| { | |
| if( Atomic_CompareAndSwap_u32( &( pMqttConnection->pingreq.u.operation.periodic.ping.failure ), | |
| 0, | |
| 1 ) == 1 ) | |
| { | |
| IotLogDebug( "(MQTT connection %p) PINGRESP successfully parsed.", | |
| pMqttConnection ); | |
| } | |
| else | |
| { | |
| IotLogWarn( "(MQTT connection %p) Unexpected PINGRESP received.", | |
| pMqttConnection ); | |
| } | |
| } | |
| else | |
| { | |
| EMPTY_ELSE_MARKER; | |
| } | |
| break; | |
| } | |
| if( status != IOT_MQTT_SUCCESS ) | |
| { | |
| IotLogError( "(MQTT connection %p) Packet parser status %s.", | |
| pMqttConnection, | |
| IotMqtt_strerror( status ) ); | |
| } | |
| else | |
| { | |
| EMPTY_ELSE_MARKER; | |
| } | |
| return status; | |
| } | |
| /*-----------------------------------------------------------*/ | |
| static void _sendPuback( _mqttConnection_t * pMqttConnection, | |
| uint16_t packetIdentifier ) | |
| { | |
| IotMqttError_t status = IOT_MQTT_STATUS_PENDING; | |
| _mqttOperation_t * pPubackOperation = NULL; | |
| IotLogDebug( "(MQTT connection %p) Sending PUBACK for received PUBLISH %hu.", | |
| pMqttConnection, | |
| packetIdentifier ); | |
| /* Create a PUBACK operation. */ | |
| status = _IotMqtt_CreateOperation( pMqttConnection, | |
| 0, | |
| NULL, | |
| &pPubackOperation ); | |
| if( status != IOT_MQTT_SUCCESS ) | |
| { | |
| IOT_GOTO_CLEANUP(); | |
| } | |
| /* Set the operation type. */ | |
| pPubackOperation->u.operation.type = IOT_MQTT_PUBACK; | |
| /* Generate a PUBACK packet from the packet identifier. */ | |
| status = _getMqttPubackSerializer( pMqttConnection->pSerializer )( packetIdentifier, | |
| &( pPubackOperation->u.operation.pMqttPacket ), | |
| &( pPubackOperation->u.operation.packetSize ) ); | |
| if( status != IOT_MQTT_SUCCESS ) | |
| { | |
| IOT_GOTO_CLEANUP(); | |
| } | |
| /* Add the PUBACK operation to the send queue for network transmission. */ | |
| status = _IotMqtt_ScheduleOperation( pPubackOperation, | |
| _IotMqtt_ProcessSend, | |
| 0 ); | |
| if( status != IOT_MQTT_SUCCESS ) | |
| { | |
| IotLogError( "(MQTT connection %p) Failed to enqueue PUBACK for sending.", | |
| pMqttConnection ); | |
| IOT_GOTO_CLEANUP(); | |
| } | |
| else | |
| { | |
| EMPTY_ELSE_MARKER; | |
| } | |
| /* Clean up on error. */ | |
| IOT_FUNCTION_CLEANUP_BEGIN(); | |
| if( status != IOT_MQTT_SUCCESS ) | |
| { | |
| if( pPubackOperation != NULL ) | |
| { | |
| _IotMqtt_DestroyOperation( pPubackOperation ); | |
| } | |
| else | |
| { | |
| EMPTY_ELSE_MARKER; | |
| } | |
| } | |
| else | |
| { | |
| EMPTY_ELSE_MARKER; | |
| } | |
| } | |
| /*-----------------------------------------------------------*/ | |
| static void _flushPacket( void * pNetworkConnection, | |
| const _mqttConnection_t * pMqttConnection, | |
| size_t length ) | |
| { | |
| size_t bytesFlushed = 0; | |
| uint8_t receivedByte = 0; | |
| for( bytesFlushed = 0; bytesFlushed < length; bytesFlushed++ ) | |
| { | |
| ( void ) _IotMqtt_GetNextByte( pNetworkConnection, | |
| pMqttConnection->pNetworkInterface, | |
| &receivedByte ); | |
| } | |
| } | |
| /*-----------------------------------------------------------*/ | |
| bool _IotMqtt_GetNextByte( void * pNetworkConnection, | |
| const IotNetworkInterface_t * pNetworkInterface, | |
| uint8_t * pIncomingByte ) | |
| { | |
| bool status = false; | |
| uint8_t incomingByte = 0; | |
| size_t bytesReceived = 0; | |
| /* Attempt to read 1 byte. */ | |
| bytesReceived = pNetworkInterface->receive( pNetworkConnection, | |
| &incomingByte, | |
| 1 ); | |
| /* Set the output parameter and return success if 1 byte was read. */ | |
| if( bytesReceived == 1 ) | |
| { | |
| *pIncomingByte = incomingByte; | |
| status = true; | |
| } | |
| else | |
| { | |
| /* Network receive must return 0 on failure. */ | |
| IotMqtt_Assert( bytesReceived == 0 ); | |
| } | |
| return status; | |
| } | |
| /*-----------------------------------------------------------*/ | |
| void _IotMqtt_CloseNetworkConnection( IotMqttDisconnectReason_t disconnectReason, | |
| _mqttConnection_t * pMqttConnection ) | |
| { | |
| IotTaskPoolError_t taskPoolStatus = IOT_TASKPOOL_SUCCESS; | |
| IotNetworkError_t closeStatus = IOT_NETWORK_SUCCESS; | |
| IotMqttCallbackParam_t callbackParam = { .u.message = { 0 } }; | |
| void * pNetworkConnection = NULL, * pDisconnectCallbackContext = NULL; | |
| /* Disconnect callback function. */ | |
| void ( * disconnectCallback )( void *, | |
| IotMqttCallbackParam_t * ) = NULL; | |
| /* Network close function. */ | |
| IotNetworkError_t ( * closeConnection) ( IotNetworkConnection_t ) = NULL; | |
| /* Mark the MQTT connection as disconnected and the keep-alive as failed. */ | |
| IotMutex_Lock( &( pMqttConnection->referencesMutex ) ); | |
| pMqttConnection->disconnected = true; | |
| if( pMqttConnection->pingreq.u.operation.periodic.ping.keepAliveMs != 0 ) | |
| { | |
| /* Keep-alive must have a PINGREQ allocated. */ | |
| IotMqtt_Assert( pMqttConnection->pingreq.u.operation.pMqttPacket != NULL ); | |
| IotMqtt_Assert( pMqttConnection->pingreq.u.operation.packetSize != 0 ); | |
| /* PINGREQ provides a reference to the connection, so reference count must | |
| * be nonzero. */ | |
| IotMqtt_Assert( pMqttConnection->references > 0 ); | |
| /* Attempt to cancel the keep-alive job. */ | |
| taskPoolStatus = IotTaskPool_TryCancel( IOT_SYSTEM_TASKPOOL, | |
| pMqttConnection->pingreq.job, | |
| NULL ); | |
| /* Clean up keep-alive if its job was successfully canceled. Otherwise, | |
| * the executing keep-alive job will clean up itself. */ | |
| if( taskPoolStatus == IOT_TASKPOOL_SUCCESS ) | |
| { | |
| /* Free the packet */ | |
| _getMqttFreePacketFunc( pMqttConnection->pSerializer )( pMqttConnection->pingreq.u.operation.pMqttPacket ); | |
| /* Clear data about the keep-alive. */ | |
| pMqttConnection->pingreq.u.operation.periodic.ping.keepAliveMs = 0; | |
| pMqttConnection->pingreq.u.operation.pMqttPacket = NULL; | |
| pMqttConnection->pingreq.u.operation.packetSize = 0; | |
| /* Keep-alive is cleaned up; decrement reference count. Since this | |
| * function must be followed with a call to DISCONNECT, a check to | |
| * destroy the connection is not done here. */ | |
| pMqttConnection->references--; | |
| IotLogDebug( "(MQTT connection %p) Keep-alive job canceled and cleaned up.", | |
| pMqttConnection ); | |
| } | |
| else | |
| { | |
| EMPTY_ELSE_MARKER; | |
| } | |
| } | |
| else | |
| { | |
| EMPTY_ELSE_MARKER; | |
| } | |
| /* Copy the function pointers and contexts, as the MQTT connection may be | |
| * modified after the mutex is released. */ | |
| disconnectCallback = pMqttConnection->disconnectCallback.function; | |
| pDisconnectCallbackContext = pMqttConnection->disconnectCallback.pCallbackContext; | |
| closeConnection = pMqttConnection->pNetworkInterface->close; | |
| pNetworkConnection = pMqttConnection->pNetworkConnection; | |
| IotMutex_Unlock( &( pMqttConnection->referencesMutex ) ); | |
| /* Close the network connection. */ | |
| if( closeConnection != NULL ) | |
| { | |
| closeStatus = closeConnection( pNetworkConnection ); | |
| if( closeStatus == IOT_NETWORK_SUCCESS ) | |
| { | |
| IotLogInfo( "(MQTT connection %p) Network connection closed.", pMqttConnection ); | |
| } | |
| else | |
| { | |
| IotLogWarn( "(MQTT connection %p) Failed to close network connection, error %d.", | |
| pMqttConnection, | |
| closeStatus ); | |
| } | |
| } | |
| else | |
| { | |
| IotLogWarn( "(MQTT connection %p) No network close function was set. Network connection" | |
| " not closed.", pMqttConnection ); | |
| } | |
| /* Invoke the disconnect callback. */ | |
| if( disconnectCallback != NULL ) | |
| { | |
| /* Set the members of the callback parameter. */ | |
| callbackParam.mqttConnection = pMqttConnection; | |
| callbackParam.u.disconnectReason = disconnectReason; | |
| disconnectCallback( pDisconnectCallbackContext, | |
| &callbackParam ); | |
| } | |
| else | |
| { | |
| EMPTY_ELSE_MARKER; | |
| } | |
| } | |
| /*-----------------------------------------------------------*/ | |
| void IotMqtt_ReceiveCallback( IotNetworkConnection_t pNetworkConnection, | |
| void * pReceiveContext ) | |
| { | |
| IotMqttError_t status = IOT_MQTT_SUCCESS; | |
| _mqttPacket_t incomingPacket = { .u.pMqttConnection = NULL }; | |
| /* Cast context to correct type. */ | |
| _mqttConnection_t * pMqttConnection = ( _mqttConnection_t * ) pReceiveContext; | |
| /* Read an MQTT packet from the network. */ | |
| status = _getIncomingPacket( pNetworkConnection, | |
| pMqttConnection, | |
| &incomingPacket ); | |
| if( status == IOT_MQTT_SUCCESS ) | |
| { | |
| /* Deserialize the received packet. */ | |
| status = _deserializeIncomingPacket( pMqttConnection, | |
| &incomingPacket ); | |
| /* Free any buffers allocated for the MQTT packet. */ | |
| if( incomingPacket.pRemainingData != NULL ) | |
| { | |
| IotMqtt_FreeMessage( incomingPacket.pRemainingData ); | |
| } | |
| else | |
| { | |
| EMPTY_ELSE_MARKER; | |
| } | |
| } | |
| else | |
| { | |
| EMPTY_ELSE_MARKER; | |
| } | |
| /* Close the network connection on a bad response. */ | |
| if( status == IOT_MQTT_BAD_RESPONSE ) | |
| { | |
| IotLogError( "(MQTT connection %p) Error processing incoming data. Closing connection.", | |
| pMqttConnection ); | |
| _IotMqtt_CloseNetworkConnection( IOT_MQTT_BAD_PACKET_RECEIVED, | |
| pMqttConnection ); | |
| } | |
| else | |
| { | |
| EMPTY_ELSE_MARKER; | |
| } | |
| } | |
| /*-----------------------------------------------------------*/ | |
| IotMqttError_t IotMqtt_GetIncomingMQTTPacketTypeAndLength( IotMqttPacketInfo_t * pIncomingPacket, | |
| IotMqttGetNextByte_t getNextByte, | |
| void * pNetworkConnection ) | |
| { | |
| IotMqttError_t status = IOT_MQTT_SUCCESS; | |
| /* Read the packet type, which is the first byte available. */ | |
| if( getNextByte( pNetworkConnection, &( pIncomingPacket->type ) ) == IOT_MQTT_SUCCESS ) | |
| { | |
| /* Check that the incoming packet type is valid. */ | |
| if( _incomingPacketValid( pIncomingPacket->type ) == false ) | |
| { | |
| IotLogError( "(MQTT connection) Unknown packet type %02x received.", | |
| pIncomingPacket->type ); | |
| status = IOT_MQTT_BAD_RESPONSE; | |
| } | |
| else | |
| { | |
| /* Read the remaining length. */ | |
| pIncomingPacket->remainingLength = _IotMqtt_GetRemainingLength_Generic( pNetworkConnection, | |
| getNextByte ); | |
| if( pIncomingPacket->remainingLength == MQTT_REMAINING_LENGTH_INVALID ) | |
| { | |
| status = IOT_MQTT_BAD_RESPONSE; | |
| } | |
| } | |
| } | |
| else | |
| { | |
| status = IOT_MQTT_NETWORK_ERROR; | |
| } | |
| return status; | |
| } | |
| /*-----------------------------------------------------------*/ |