/* | |
* AWS IoT Shadow V2.1.0 | |
* Copyright (C) 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. | |
* | |
* Permission is hereby granted, free of charge, to any person obtaining a copy of | |
* this software and associated documentation files (the "Software"), to deal in | |
* the Software without restriction, including without limitation the rights to | |
* use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of | |
* the Software, and to permit persons to whom the Software is furnished to do so, | |
* subject to the following conditions: | |
* | |
* The above copyright notice and this permission notice shall be included in all | |
* copies or substantial portions of the Software. | |
* | |
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR | |
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS | |
* FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR | |
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER | |
* IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN | |
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. | |
*/ | |
/** | |
* @file aws_iot_shadow_operation.c | |
* @brief Implements functions that process Shadow operations. | |
*/ | |
/* The config header is always included first. */ | |
#include "iot_config.h" | |
/* Standard includes. */ | |
#include <string.h> | |
/* Shadow internal include. */ | |
#include "private/aws_iot_shadow_internal.h" | |
/* Platform layer includes. */ | |
#include "platform/iot_threads.h" | |
/* MQTT include. */ | |
#include "iot_mqtt.h" | |
/* Error handling include. */ | |
#include "iot_error.h" | |
/*-----------------------------------------------------------*/ | |
/** | |
* @brief First parameter to #_shadowOperation_match. | |
*/ | |
typedef struct _operationMatchParams | |
{ | |
_shadowOperationType_t type; /**< @brief DELETE, GET, or UPDATE. */ | |
const char * pThingName; /**< @brief Thing Name of Shadow operation. */ | |
size_t thingNameLength; /**< @brief Length of #_operationMatchParams_t.pThingName. */ | |
const char * pDocument; /**< @brief Shadow UPDATE response document. */ | |
size_t documentLength; /**< @brief Length of #_operationMatchParams_t.pDocument. */ | |
} _operationMatchParams_t; | |
/*-----------------------------------------------------------*/ | |
/** | |
* @brief Match a received Shadow response with a Shadow operation awaiting a | |
* response. | |
* | |
* @param[in] pOperationLink Pointer to the link member of the #_shadowOperation_t | |
* to check. | |
* @param[in] pMatch Pointer to an #_operationMatchParams_t. | |
* | |
* @return `true` if `pMatch` matches the received response; `false` otherwise. | |
*/ | |
static bool _shadowOperation_match( const IotLink_t * pOperationLink, | |
void * pMatch ); | |
/** | |
* @brief Common function for processing received Shadow responses. | |
* | |
* @param[in] type DELETE, GET, or UPDATE. | |
* @param[in] pMessage Received Shadow response (as an MQTT PUBLISH message). | |
*/ | |
static void _commonOperationCallback( _shadowOperationType_t type, | |
IotMqttCallbackParam_t * pMessage ); | |
/** | |
* @brief Invoked when a Shadow response is received for Shadow DELETE. | |
* | |
* @param[in] pArgument Ignored. | |
* @param[in] pMessage Received Shadow response (as an MQTT PUBLISH message). | |
*/ | |
static void _deleteCallback( void * pArgument, | |
IotMqttCallbackParam_t * pMessage ); | |
/** | |
* @brief Invoked when a Shadow response is received for a Shadow GET. | |
* | |
* @param[in] pArgument Ignored. | |
* @param[in] pMessage Received Shadow response (as an MQTT PUBLISH message). | |
*/ | |
static void _getCallback( void * pArgument, | |
IotMqttCallbackParam_t * pMessage ); | |
/** | |
* @brief Process an incoming Shadow document received when a Shadow GET is | |
* accepted. | |
* | |
* @param[in] pOperation The GET operation associated with the incoming Shadow | |
* document. | |
* @param[in] pPublishInfo The received Shadow document (as an MQTT PUBLISH | |
* message). | |
* | |
* @return #AWS_IOT_SHADOW_SUCCESS or #AWS_IOT_SHADOW_NO_MEMORY. Memory allocation | |
* only happens for a waitable `pOperation`. | |
*/ | |
static AwsIotShadowError_t _processAcceptedGet( _shadowOperation_t * pOperation, | |
const IotMqttPublishInfo_t * pPublishInfo ); | |
/** | |
* @brief Invoked when a Shadow response is received for a Shadow UPDATE. | |
* | |
* @param[in] pArgument Ignored. | |
* @param[in] pMessage Received Shadow response (as an MQTT PUBLISH message). | |
*/ | |
static void _updateCallback( void * pArgument, | |
IotMqttCallbackParam_t * pMessage ); | |
/** | |
* @brief Notify of a completed Shadow operation. | |
* | |
* @param[in] pOperation The operation which completed. | |
* | |
* Depending on the parameters passed to a user-facing Shadow function, the | |
* notification will cause @ref shadow_function_wait to return or invoke a | |
* user-provided callback. | |
*/ | |
static void _notifyCompletion( _shadowOperation_t * pOperation ); | |
/** | |
* @brief Get a Shadow subscription to use with a Shadow operation. | |
* | |
* This function may use an existing Shadow subscription, or it may allocate a | |
* new one. | |
* | |
* @param[in] pThingName Thing Name associated with operation. | |
* @param[in] thingNameLength Length of `pThingName`. | |
* @param[in] pTopicBuffer Contains the topic to use for subscribing. | |
* @param[in] operationTopicLength The length of the base topic in `pTopicBuffer`. | |
* @param[in] pOperation Shadow operation that needs a subscription. | |
* @param[out] pFreeTopicBuffer Whether the caller may free `pTopicBuffer` | |
* (which may be assigned to a subscription). | |
* | |
* @return #AWS_IOT_SHADOW_SUCCESS or #AWS_IOT_SHADOW_NO_MEMORY | |
*/ | |
static AwsIotShadowError_t _findSubscription( const char * pThingName, | |
size_t thingNameLength, | |
char * pTopicBuffer, | |
uint16_t operationTopicLength, | |
_shadowOperation_t * pOperation, | |
bool * pFreeTopicBuffer ); | |
/*-----------------------------------------------------------*/ | |
#if LIBRARY_LOG_LEVEL > IOT_LOG_NONE | |
/** | |
* @brief Printable names for each of the Shadow operations. | |
*/ | |
const char * const _pAwsIotShadowOperationNames[] = | |
{ | |
"DELETE", | |
"GET", | |
"UPDATE", | |
"SET DELTA", | |
"SET UPDATED" | |
}; | |
#endif /* if LIBRARY_LOG_LEVEL > IOT_LOG_NONE */ | |
/** | |
* @brief List of active Shadow operations awaiting a response from the Shadow | |
* service. | |
*/ | |
IotListDouble_t _AwsIotShadowPendingOperations = { 0 }; | |
/** | |
* @brief Protects #_AwsIotShadowPendingOperations from concurrent access. | |
*/ | |
IotMutex_t _AwsIotShadowPendingOperationsMutex; | |
/*-----------------------------------------------------------*/ | |
static bool _shadowOperation_match( const IotLink_t * pOperationLink, | |
void * pMatch ) | |
{ | |
/* Because this function is called from a container function, the given link | |
* must never be NULL. */ | |
AwsIotShadow_Assert( pOperationLink != NULL ); | |
_shadowOperation_t * pOperation = IotLink_Container( _shadowOperation_t, | |
pOperationLink, | |
link ); | |
_operationMatchParams_t * pParam = ( _operationMatchParams_t * ) pMatch; | |
_shadowSubscription_t * pSubscription = pOperation->pSubscription; | |
const char * pClientToken = NULL; | |
size_t clientTokenLength = 0; | |
/* Check for matching Thing Name and operation type. */ | |
bool match = ( pOperation->type == pParam->type ) && | |
( pParam->thingNameLength == pSubscription->thingNameLength ) && | |
( strncmp( pParam->pThingName, | |
pSubscription->pThingName, | |
pParam->thingNameLength ) == 0 ); | |
/* For a Shadow UPDATE operation, compare the client tokens. */ | |
if( ( match == true ) && ( pOperation->type == SHADOW_UPDATE ) ) | |
{ | |
/* Check document pointers. */ | |
AwsIotShadow_Assert( pParam->pDocument != NULL ); | |
AwsIotShadow_Assert( pParam->documentLength > 0 ); | |
AwsIotShadow_Assert( pOperation->u.update.pClientToken != NULL ); | |
AwsIotShadow_Assert( pOperation->u.update.clientTokenLength > 0 ); | |
IotLogDebug( "Verifying client tokens for Shadow UPDATE." ); | |
/* Check for the client token in the UPDATE response document. */ | |
match = AwsIot_GetClientToken( pParam->pDocument, | |
pParam->documentLength, | |
&pClientToken, | |
&clientTokenLength ); | |
/* If the UPDATE response document has a client token, check that it | |
* matches. */ | |
if( match == true ) | |
{ | |
match = ( clientTokenLength == pOperation->u.update.clientTokenLength ) && | |
( strncmp( pClientToken, | |
pOperation->u.update.pClientToken, | |
clientTokenLength ) == 0 ); | |
} | |
else | |
{ | |
IotLogWarn( "Received a Shadow UPDATE response with no client token. " | |
"This is possibly a response to a bad JSON document:\r\n%.*s", | |
pParam->documentLength, | |
pParam->pDocument ); | |
} | |
} | |
return match; | |
} | |
/*-----------------------------------------------------------*/ | |
static void _commonOperationCallback( _shadowOperationType_t type, | |
IotMqttCallbackParam_t * pMessage ) | |
{ | |
_shadowOperation_t * pOperation = NULL; | |
IotLink_t * pOperationLink = NULL; | |
AwsIotStatus_t status = AWS_IOT_UNKNOWN; | |
_operationMatchParams_t param = { .type = ( _shadowOperationType_t ) 0 }; | |
uint32_t flags = 0; | |
/* Set operation type to search. */ | |
param.type = type; | |
/* Set the response document for a Shadow UPDATE. */ | |
if( type == SHADOW_UPDATE ) | |
{ | |
param.pDocument = pMessage->u.message.info.pPayload; | |
param.documentLength = pMessage->u.message.info.payloadLength; | |
} | |
/* Parse the Thing Name from the MQTT topic name. */ | |
if( AwsIot_ParseThingName( pMessage->u.message.info.pTopicName, | |
pMessage->u.message.info.topicNameLength, | |
&( param.pThingName ), | |
&( param.thingNameLength ) ) == false ) | |
{ | |
IOT_GOTO_CLEANUP(); | |
} | |
/* Lock the pending operations list for exclusive access. */ | |
IotMutex_Lock( &( _AwsIotShadowPendingOperationsMutex ) ); | |
/* Search for a matching pending operation. */ | |
pOperationLink = IotListDouble_FindFirstMatch( &( _AwsIotShadowPendingOperations ), | |
NULL, | |
_shadowOperation_match, | |
¶m ); | |
/* Find and remove the first Shadow operation of the given type. */ | |
if( pOperationLink == NULL ) | |
{ | |
/* Operation is not pending. It may have already been processed. Return | |
* without doing anything */ | |
IotMutex_Unlock( &( _AwsIotShadowPendingOperationsMutex ) ); | |
IotLogWarn( "Shadow %s callback received an unknown operation.", | |
_pAwsIotShadowOperationNames[ type ] ); | |
IOT_GOTO_CLEANUP(); | |
} | |
else | |
{ | |
pOperation = IotLink_Container( _shadowOperation_t, pOperationLink, link ); | |
/* Remove a non-waitable operation from the pending operation list. | |
* Waitable operations are removed by the Wait function. */ | |
if( ( pOperation->flags & AWS_IOT_SHADOW_FLAG_WAITABLE ) == 0 ) | |
{ | |
IotListDouble_Remove( &( pOperation->link ) ); | |
IotMutex_Unlock( &( _AwsIotShadowPendingOperationsMutex ) ); | |
} | |
} | |
/* Check that the Shadow operation type and status. */ | |
AwsIotShadow_Assert( pOperation->type == type ); | |
AwsIotShadow_Assert( pOperation->status == AWS_IOT_SHADOW_STATUS_PENDING ); | |
IotLogDebug( "Received Shadow response on topic %.*s", | |
pMessage->u.message.info.topicNameLength, | |
pMessage->u.message.info.pTopicName ); | |
/* Parse the status from the topic name. */ | |
status = AwsIot_ParseStatus( pMessage->u.message.info.pTopicName, | |
pMessage->u.message.info.topicNameLength ); | |
switch( status ) | |
{ | |
case AWS_IOT_ACCEPTED: | |
IotLogInfo( "Shadow %s of %.*s was ACCEPTED.", | |
_pAwsIotShadowOperationNames[ type ], | |
pOperation->pSubscription->thingNameLength, | |
pOperation->pSubscription->pThingName ); | |
/* Process the retrieved document for a Shadow GET. Otherwise, set | |
* status to success. */ | |
if( type == SHADOW_GET ) | |
{ | |
pOperation->status = _processAcceptedGet( pOperation, | |
&( pMessage->u.message.info ) ); | |
} | |
else | |
{ | |
pOperation->status = AWS_IOT_SHADOW_SUCCESS; | |
} | |
break; | |
case AWS_IOT_REJECTED: | |
IotLogWarn( "Shadow %s of %.*s was REJECTED.", | |
_pAwsIotShadowOperationNames[ type ], | |
pOperation->pSubscription->thingNameLength, | |
pOperation->pSubscription->pThingName ); | |
pOperation->status = _AwsIotShadow_ParseErrorDocument( pMessage->u.message.info.pPayload, | |
pMessage->u.message.info.payloadLength ); | |
break; | |
default: | |
IotLogWarn( "Unknown status for %s of %.*s Shadow. Ignoring message.", | |
_pAwsIotShadowOperationNames[ type ], | |
pOperation->pSubscription->thingNameLength, | |
pOperation->pSubscription->pThingName ); | |
pOperation->status = AWS_IOT_SHADOW_BAD_RESPONSE; | |
break; | |
} | |
/* Copy the flags from the Shadow operation. The notify function may delete the operation. */ | |
flags = pOperation->flags; | |
/* Notify of operation completion. */ | |
_notifyCompletion( pOperation ); | |
/* For waitable operations, unlock the pending operation list mutex to allow | |
* the Wait function to run. */ | |
if( ( flags & AWS_IOT_SHADOW_FLAG_WAITABLE ) == AWS_IOT_SHADOW_FLAG_WAITABLE ) | |
{ | |
IotMutex_Unlock( &( _AwsIotShadowPendingOperationsMutex ) ); | |
} | |
/* This function has no return value and no cleanup, but uses the cleanup | |
* label to exit on error. */ | |
IOT_FUNCTION_CLEANUP_BEGIN(); | |
} | |
/*-----------------------------------------------------------*/ | |
static void _deleteCallback( void * pArgument, | |
IotMqttCallbackParam_t * pMessage ) | |
{ | |
/* Silence warnings about unused parameter. */ | |
( void ) pArgument; | |
_commonOperationCallback( SHADOW_DELETE, pMessage ); | |
} | |
/*-----------------------------------------------------------*/ | |
static void _getCallback( void * pArgument, | |
IotMqttCallbackParam_t * pMessage ) | |
{ | |
/* Silence warnings about unused parameter. */ | |
( void ) pArgument; | |
_commonOperationCallback( SHADOW_GET, pMessage ); | |
} | |
/*-----------------------------------------------------------*/ | |
static AwsIotShadowError_t _processAcceptedGet( _shadowOperation_t * pOperation, | |
const IotMqttPublishInfo_t * pPublishInfo ) | |
{ | |
AwsIotShadowError_t status = AWS_IOT_SHADOW_SUCCESS; | |
/* A non-waitable operation can re-use the pointers from the publish info, | |
* since those are guaranteed to be in-scope throughout the user callback. | |
* But a waitable operation must copy the data from the publish info because | |
* AwsIotShadow_Wait may be called after the MQTT library frees the publish | |
* info. */ | |
if( ( pOperation->flags & AWS_IOT_SHADOW_FLAG_WAITABLE ) == 0 ) | |
{ | |
pOperation->u.get.pDocument = pPublishInfo->pPayload; | |
pOperation->u.get.documentLength = pPublishInfo->payloadLength; | |
} | |
else | |
{ | |
IotLogDebug( "Allocating new buffer for waitable Shadow GET." ); | |
/* Parameter validation should not have allowed a NULL malloc function. */ | |
AwsIotShadow_Assert( pOperation->u.get.mallocDocument != NULL ); | |
/* Allocate a buffer for the retrieved document. */ | |
pOperation->u.get.pDocument = pOperation->u.get.mallocDocument( pPublishInfo->payloadLength ); | |
if( pOperation->u.get.pDocument == NULL ) | |
{ | |
IotLogError( "Failed to allocate buffer for retrieved Shadow document." ); | |
status = AWS_IOT_SHADOW_NO_MEMORY; | |
} | |
else | |
{ | |
/* Copy the retrieved document. */ | |
( void ) memcpy( ( void * ) pOperation->u.get.pDocument, | |
pPublishInfo->pPayload, | |
pPublishInfo->payloadLength ); | |
pOperation->u.get.documentLength = pPublishInfo->payloadLength; | |
} | |
} | |
return status; | |
} | |
/*-----------------------------------------------------------*/ | |
static void _updateCallback( void * pArgument, | |
IotMqttCallbackParam_t * pMessage ) | |
{ | |
/* Silence warnings about unused parameter. */ | |
( void ) pArgument; | |
_commonOperationCallback( SHADOW_UPDATE, pMessage ); | |
} | |
/*-----------------------------------------------------------*/ | |
static void _notifyCompletion( _shadowOperation_t * pOperation ) | |
{ | |
AwsIotShadowCallbackParam_t callbackParam = { .callbackType = ( AwsIotShadowCallbackType_t ) 0 }; | |
_shadowSubscription_t * pSubscription = pOperation->pSubscription, | |
* pRemovedSubscription = NULL; | |
/* If the operation is waiting, post to its wait semaphore and return. */ | |
if( ( pOperation->flags & AWS_IOT_SHADOW_FLAG_WAITABLE ) == AWS_IOT_SHADOW_FLAG_WAITABLE ) | |
{ | |
IotSemaphore_Post( &( pOperation->notify.waitSemaphore ) ); | |
} | |
else | |
{ | |
/* Decrement the reference count. This also removes subscriptions if the | |
* count reaches 0. */ | |
IotMutex_Lock( &_AwsIotShadowSubscriptionsMutex ); | |
_AwsIotShadow_DecrementReferences( pOperation, | |
pSubscription->pTopicBuffer, | |
&pRemovedSubscription ); | |
IotMutex_Unlock( &_AwsIotShadowSubscriptionsMutex ); | |
/* Set the subscription pointer used for the user callback based on whether | |
* a subscription was removed from the list. */ | |
if( pRemovedSubscription != NULL ) | |
{ | |
pSubscription = pRemovedSubscription; | |
} | |
AwsIotShadow_Assert( pSubscription != NULL ); | |
/* Invoke the user callback if provided. */ | |
if( pOperation->notify.callback.function != NULL ) | |
{ | |
/* Set the common members of the callback parameter. */ | |
callbackParam.callbackType = ( AwsIotShadowCallbackType_t ) pOperation->type; | |
callbackParam.mqttConnection = pOperation->mqttConnection; | |
callbackParam.u.operation.result = pOperation->status; | |
callbackParam.u.operation.reference = pOperation; | |
callbackParam.pThingName = pSubscription->pThingName; | |
callbackParam.thingNameLength = pSubscription->thingNameLength; | |
/* Set the members of the callback parameter for a received document. */ | |
if( pOperation->type == SHADOW_GET ) | |
{ | |
callbackParam.u.operation.get.pDocument = pOperation->u.get.pDocument; | |
callbackParam.u.operation.get.documentLength = pOperation->u.get.documentLength; | |
} | |
pOperation->notify.callback.function( pOperation->notify.callback.pCallbackContext, | |
&callbackParam ); | |
} | |
/* Destroy a removed subscription. */ | |
if( pRemovedSubscription != NULL ) | |
{ | |
_AwsIotShadow_DestroySubscription( pRemovedSubscription ); | |
} | |
_AwsIotShadow_DestroyOperation( pOperation ); | |
} | |
} | |
/*-----------------------------------------------------------*/ | |
static AwsIotShadowError_t _findSubscription( const char * pThingName, | |
size_t thingNameLength, | |
char * pTopicBuffer, | |
uint16_t operationTopicLength, | |
_shadowOperation_t * pOperation, | |
bool * pFreeTopicBuffer ) | |
{ | |
AwsIotShadowError_t status = AWS_IOT_SHADOW_SUCCESS; | |
_shadowSubscription_t * pSubscription = NULL; | |
/* Lookup table for Shadow operation callbacks. */ | |
const AwsIotMqttCallbackFunction_t shadowCallbacks[ SHADOW_OPERATION_COUNT ] = | |
{ | |
_deleteCallback, | |
_getCallback, | |
_updateCallback | |
}; | |
/* Lock the subscriptions mutex for exclusive access. */ | |
IotMutex_Lock( &_AwsIotShadowSubscriptionsMutex ); | |
/* Check for an existing subscription. This function will attempt to allocate | |
* a new subscription if not found. */ | |
pSubscription = _AwsIotShadow_FindSubscription( pThingName, | |
thingNameLength, | |
true ); | |
if( pSubscription == NULL ) | |
{ | |
status = AWS_IOT_SHADOW_NO_MEMORY; | |
} | |
else | |
{ | |
/* Ensure that the subscription Thing Name matches. */ | |
AwsIotShadow_Assert( pSubscription != NULL ); | |
AwsIotShadow_Assert( pSubscription->thingNameLength == thingNameLength ); | |
AwsIotShadow_Assert( strncmp( pSubscription->pThingName, | |
pThingName, | |
thingNameLength ) == 0 ); | |
/* Set the subscription object for the Shadow operation. */ | |
pOperation->pSubscription = pSubscription; | |
/* Assign the topic buffer to the subscription to use for unsubscribing if | |
* the subscription has no topic buffer. */ | |
if( pSubscription->pTopicBuffer == NULL ) | |
{ | |
pSubscription->pTopicBuffer = pTopicBuffer; | |
/* Don't free the topic buffer if it was allocated to the subscription. */ | |
*pFreeTopicBuffer = false; | |
} | |
else | |
{ | |
*pFreeTopicBuffer = true; | |
} | |
/* Increment the reference count for this Shadow operation's | |
* subscriptions. */ | |
status = _AwsIotShadow_IncrementReferences( pOperation, | |
pTopicBuffer, | |
operationTopicLength, | |
shadowCallbacks[ pOperation->type ] ); | |
if( status != AWS_IOT_SHADOW_SUCCESS ) | |
{ | |
/* Failed to add subscriptions for a Shadow operation. The reference | |
* count was not incremented. Check if this subscription should be | |
* deleted. */ | |
_AwsIotShadow_RemoveSubscription( pSubscription, NULL ); | |
} | |
} | |
/* Unlock the Shadow subscription list mutex. */ | |
IotMutex_Unlock( &_AwsIotShadowSubscriptionsMutex ); | |
return status; | |
} | |
/*-----------------------------------------------------------*/ | |
AwsIotShadowError_t _AwsIotShadow_CreateOperation( _shadowOperation_t ** pNewOperation, | |
_shadowOperationType_t type, | |
uint32_t flags, | |
const AwsIotShadowCallbackInfo_t * pCallbackInfo ) | |
{ | |
IOT_FUNCTION_ENTRY( AwsIotShadowError_t, AWS_IOT_SHADOW_SUCCESS ); | |
_shadowOperation_t * pOperation = NULL; | |
IotLogDebug( "Creating operation record for Shadow %s.", | |
_pAwsIotShadowOperationNames[ type ] ); | |
/* Allocate memory for a new Shadow operation. */ | |
pOperation = AwsIotShadow_MallocOperation( sizeof( _shadowOperation_t ) ); | |
if( pOperation == NULL ) | |
{ | |
IotLogError( "Failed to allocate memory for Shadow %s.", | |
_pAwsIotShadowOperationNames[ type ] ); | |
IOT_SET_AND_GOTO_CLEANUP( AWS_IOT_SHADOW_NO_MEMORY ); | |
} | |
/* Clear the operation data. */ | |
( void ) memset( pOperation, 0x00, sizeof( _shadowOperation_t ) ); | |
/* Check if the waitable flag is set. If it is, create a semaphore to | |
* wait on. */ | |
if( ( flags & AWS_IOT_SHADOW_FLAG_WAITABLE ) == AWS_IOT_SHADOW_FLAG_WAITABLE ) | |
{ | |
if( IotSemaphore_Create( &( pOperation->notify.waitSemaphore ), 0, 1 ) == false ) | |
{ | |
IotLogError( "Failed to create semaphore for waitable Shadow %s.", | |
_pAwsIotShadowOperationNames[ type ] ); | |
IOT_SET_AND_GOTO_CLEANUP( AWS_IOT_SHADOW_NO_MEMORY ); | |
} | |
} | |
else | |
{ | |
/* If the waitable flag isn't set but a callback is, copy the callback | |
* information. */ | |
if( pCallbackInfo != NULL ) | |
{ | |
pOperation->notify.callback = *pCallbackInfo; | |
} | |
} | |
/* Set the remaining common members of the Shadow operation. */ | |
pOperation->type = type; | |
pOperation->flags = flags; | |
pOperation->status = AWS_IOT_SHADOW_STATUS_PENDING; | |
IOT_FUNCTION_CLEANUP_BEGIN(); | |
if( status != AWS_IOT_SHADOW_SUCCESS ) | |
{ | |
if( pOperation != NULL ) | |
{ | |
AwsIotShadow_FreeOperation( pOperation ); | |
} | |
} | |
else | |
{ | |
/* Set the output parameter. */ | |
*pNewOperation = pOperation; | |
} | |
IOT_FUNCTION_CLEANUP_END(); | |
} | |
/*-----------------------------------------------------------*/ | |
void _AwsIotShadow_DestroyOperation( void * pData ) | |
{ | |
_shadowOperation_t * pOperation = ( _shadowOperation_t * ) pData; | |
/* The Shadow operation pointer must not be NULL. */ | |
AwsIotShadow_Assert( pOperation != NULL ); | |
IotLogDebug( "Destroying Shadow operation %s.", | |
_pAwsIotShadowOperationNames[ pOperation->type ] ); | |
/* Check if a wait semaphore was created for this operation. */ | |
if( ( pOperation->flags & AWS_IOT_SHADOW_FLAG_WAITABLE ) == AWS_IOT_SHADOW_FLAG_WAITABLE ) | |
{ | |
/* Destroy the wait semaphore */ | |
IotSemaphore_Destroy( &( pOperation->notify.waitSemaphore ) ); | |
} | |
/* If this is a Shadow update, free any allocated client token. */ | |
if( ( pOperation->type == SHADOW_UPDATE ) && | |
( pOperation->u.update.pClientToken != NULL ) ) | |
{ | |
AwsIotShadow_Assert( pOperation->u.update.clientTokenLength > 0 ); | |
AwsIotShadow_FreeString( ( void * ) ( pOperation->u.update.pClientToken ) ); | |
} | |
/* Free the memory used to hold operation data. */ | |
AwsIotShadow_FreeOperation( pOperation ); | |
} | |
/*-----------------------------------------------------------*/ | |
AwsIotShadowError_t _AwsIotShadow_GenerateShadowTopic( _shadowOperationType_t type, | |
const char * pThingName, | |
size_t thingNameLength, | |
char ** pTopicBuffer, | |
uint16_t * pOperationTopicLength ) | |
{ | |
AwsIotShadowError_t status = AWS_IOT_SHADOW_SUCCESS; | |
AwsIotTopicInfo_t topicInfo = { 0 }; | |
/* Lookup table for Shadow operation strings. */ | |
const char * const pOperationString[ SHADOW_OPERATION_COUNT ] = | |
{ | |
SHADOW_DELETE_OPERATION_STRING, /* Shadow delete operation. */ | |
SHADOW_GET_OPERATION_STRING, /* Shadow get operation. */ | |
SHADOW_UPDATE_OPERATION_STRING /* Shadow update operation. */ | |
}; | |
/* Lookup table for Shadow operation string lengths. */ | |
const uint16_t pOperationStringLength[ SHADOW_OPERATION_COUNT ] = | |
{ | |
SHADOW_DELETE_OPERATION_STRING_LENGTH, /* Shadow delete operation. */ | |
SHADOW_GET_OPERATION_STRING_LENGTH, /* Shadow get operation. */ | |
SHADOW_UPDATE_OPERATION_STRING_LENGTH /* Shadow update operation. */ | |
}; | |
/* Only Shadow delete, get, and update operation types should be passed to this | |
* function. */ | |
AwsIotShadow_Assert( ( type == SHADOW_DELETE ) || | |
( type == SHADOW_GET ) || | |
( type == SHADOW_UPDATE ) ); | |
/* Set the members needed to generate an operation topic. */ | |
topicInfo.pThingName = pThingName; | |
topicInfo.thingNameLength = thingNameLength; | |
topicInfo.pOperationName = pOperationString[ type ]; | |
topicInfo.operationNameLength = pOperationStringLength[ type ]; | |
topicInfo.longestSuffixLength = SHADOW_LONGEST_SUFFIX_LENGTH; | |
topicInfo.mallocString = AwsIotShadow_MallocString; | |
if( AwsIot_GenerateOperationTopic( &topicInfo, | |
pTopicBuffer, | |
pOperationTopicLength ) == false ) | |
{ | |
status = AWS_IOT_SHADOW_NO_MEMORY; | |
} | |
return status; | |
} | |
/*-----------------------------------------------------------*/ | |
AwsIotShadowError_t _AwsIotShadow_ProcessOperation( IotMqttConnection_t mqttConnection, | |
const char * pThingName, | |
size_t thingNameLength, | |
_shadowOperation_t * pOperation, | |
const AwsIotShadowDocumentInfo_t * pDocumentInfo ) | |
{ | |
IOT_FUNCTION_ENTRY( AwsIotShadowError_t, AWS_IOT_SHADOW_STATUS_PENDING ); | |
IotMqttError_t publishStatus = IOT_MQTT_STATUS_PENDING; | |
char * pTopicBuffer = NULL; | |
uint16_t operationTopicLength = 0; | |
bool freeTopicBuffer = true; | |
IotMqttPublishInfo_t publishInfo = IOT_MQTT_PUBLISH_INFO_INITIALIZER; | |
IotLogDebug( "Processing Shadow operation %s for Thing %.*s.", | |
_pAwsIotShadowOperationNames[ pOperation->type ], | |
thingNameLength, | |
pThingName ); | |
/* Set the operation's MQTT connection. */ | |
pOperation->mqttConnection = mqttConnection; | |
/* Generate the operation topic buffer. */ | |
status = _AwsIotShadow_GenerateShadowTopic( pOperation->type, | |
pThingName, | |
thingNameLength, | |
&pTopicBuffer, | |
&operationTopicLength ); | |
if( status != AWS_IOT_SHADOW_SUCCESS ) | |
{ | |
IotLogError( "No memory for Shadow operation topic buffer." ); | |
IOT_SET_AND_GOTO_CLEANUP( AWS_IOT_SHADOW_NO_MEMORY ); | |
} | |
/* Get a subscription object for this Shadow operation. */ | |
status = _findSubscription( pThingName, | |
thingNameLength, | |
pTopicBuffer, | |
operationTopicLength, | |
pOperation, | |
&freeTopicBuffer ); | |
if( status != AWS_IOT_SHADOW_SUCCESS ) | |
{ | |
/* No subscription was found and no subscription could be allocated. */ | |
IOT_GOTO_CLEANUP(); | |
} | |
/* Set the operation topic name. */ | |
publishInfo.pTopicName = pTopicBuffer; | |
publishInfo.topicNameLength = operationTopicLength; | |
IotLogDebug( "Shadow %s message will be published to topic %.*s", | |
_pAwsIotShadowOperationNames[ pOperation->type ], | |
publishInfo.topicNameLength, | |
publishInfo.pTopicName ); | |
/* Set the document info if this operation is not a Shadow DELETE. */ | |
if( pOperation->type != SHADOW_DELETE ) | |
{ | |
publishInfo.qos = pDocumentInfo->qos; | |
publishInfo.retryLimit = pDocumentInfo->retryLimit; | |
publishInfo.retryMs = pDocumentInfo->retryMs; | |
IotLogDebug( "Shadow %s message will be published at QoS %d with " | |
"retryLimit %d and retryMs %llu.", | |
_pAwsIotShadowOperationNames[ pOperation->type ], | |
publishInfo.qos, | |
publishInfo.retryLimit, | |
publishInfo.retryMs ); | |
} | |
/* Set the PUBLISH payload to the update document for Shadow UPDATE. */ | |
if( pOperation->type == SHADOW_UPDATE ) | |
{ | |
publishInfo.pPayload = pDocumentInfo->u.update.pUpdateDocument; | |
publishInfo.payloadLength = pDocumentInfo->u.update.updateDocumentLength; | |
} | |
/* Set the PUBLISH payload to an empty string for Shadow DELETE and GET, | |
* per the Shadow spec. */ | |
else | |
{ | |
publishInfo.pPayload = ""; | |
publishInfo.payloadLength = 0; | |
} | |
/* Add Shadow operation to the pending operations list. */ | |
IotMutex_Lock( &( _AwsIotShadowPendingOperationsMutex ) ); | |
IotListDouble_InsertHead( &( _AwsIotShadowPendingOperations ), | |
&( pOperation->link ) ); | |
IotMutex_Unlock( &( _AwsIotShadowPendingOperationsMutex ) ); | |
/* Publish to the Shadow topic name. */ | |
publishStatus = IotMqtt_PublishSync( pOperation->mqttConnection, | |
&publishInfo, | |
0, | |
_AwsIotShadowMqttTimeoutMs ); | |
/* Check for errors from the MQTT publish. */ | |
if( publishStatus != IOT_MQTT_SUCCESS ) | |
{ | |
IotLogError( "Failed to publish MQTT message to %s %.*s Shadow, error %s.", | |
_pAwsIotShadowOperationNames[ pOperation->type ], | |
thingNameLength, | |
pThingName, | |
IotMqtt_strerror( publishStatus ) ); | |
/* Convert the MQTT "NO MEMORY" error to a Shadow "NO MEMORY" error. */ | |
status = SHADOW_CONVERT_STATUS_CODE_MQTT_TO_SHADOW( publishStatus ); | |
/* If the "keep subscriptions" flag is not set, decrement the reference | |
* count. */ | |
if( ( pOperation->flags & AWS_IOT_SHADOW_FLAG_KEEP_SUBSCRIPTIONS ) == 0 ) | |
{ | |
IotMutex_Lock( &_AwsIotShadowSubscriptionsMutex ); | |
_AwsIotShadow_DecrementReferences( pOperation, | |
pTopicBuffer, | |
NULL ); | |
IotMutex_Unlock( &_AwsIotShadowSubscriptionsMutex ); | |
} | |
/* Remove Shadow operation from the pending operations list. */ | |
IotMutex_Lock( &( _AwsIotShadowPendingOperationsMutex ) ); | |
IotListDouble_Remove( &( pOperation->link ) ); | |
IotMutex_Unlock( &( _AwsIotShadowPendingOperationsMutex ) ); | |
} | |
else | |
{ | |
IotLogDebug( "Shadow %s PUBLISH message successfully sent.", | |
_pAwsIotShadowOperationNames[ pOperation->type ] ); | |
} | |
IOT_FUNCTION_CLEANUP_BEGIN(); | |
/* Free the topic buffer used by this function if it was not assigned to a | |
* subscription. */ | |
if( ( freeTopicBuffer == true ) && ( pTopicBuffer != NULL ) ) | |
{ | |
AwsIotShadow_FreeString( pTopicBuffer ); | |
} | |
/* Destroy the Shadow operation on failure. */ | |
if( status != AWS_IOT_SHADOW_SUCCESS ) | |
{ | |
_AwsIotShadow_DestroyOperation( pOperation ); | |
} | |
else | |
{ | |
/* Convert successful return code to "status pending", as the Shadow | |
* library is now waiting for a response from the service. */ | |
status = AWS_IOT_SHADOW_STATUS_PENDING; | |
} | |
IOT_FUNCTION_CLEANUP_END(); | |
} | |
/*-----------------------------------------------------------*/ |