blob: 27f7f91efef7b41256dfc70b1ea66e719dc2ac61 [file] [log] [blame]
/*
* AWS IoT Shadow V2.1.0
* Copyright (C) 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Permission is hereby granted, free of charge, to any person obtaining a copy of
* this software and associated documentation files (the "Software"), to deal in
* the Software without restriction, including without limitation the rights to
* use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
* the Software, and to permit persons to whom the Software is furnished to do so,
* subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
* FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
* IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
/**
* @file aws_iot_shadow_operation.c
* @brief Implements functions that process Shadow operations.
*/
/* The config header is always included first. */
#include "iot_config.h"
/* Standard includes. */
#include <string.h>
/* Shadow internal include. */
#include "private/aws_iot_shadow_internal.h"
/* Platform layer includes. */
#include "platform/iot_threads.h"
/* MQTT include. */
#include "iot_mqtt.h"
/* Error handling include. */
#include "iot_error.h"
/*-----------------------------------------------------------*/
/**
* @brief First parameter to #_shadowOperation_match.
*/
typedef struct _operationMatchParams
{
_shadowOperationType_t type; /**< @brief DELETE, GET, or UPDATE. */
const char * pThingName; /**< @brief Thing Name of Shadow operation. */
size_t thingNameLength; /**< @brief Length of #_operationMatchParams_t.pThingName. */
const char * pDocument; /**< @brief Shadow UPDATE response document. */
size_t documentLength; /**< @brief Length of #_operationMatchParams_t.pDocument. */
} _operationMatchParams_t;
/*-----------------------------------------------------------*/
/**
* @brief Match a received Shadow response with a Shadow operation awaiting a
* response.
*
* @param[in] pOperationLink Pointer to the link member of the #_shadowOperation_t
* to check.
* @param[in] pMatch Pointer to an #_operationMatchParams_t.
*
* @return `true` if `pMatch` matches the received response; `false` otherwise.
*/
static bool _shadowOperation_match( const IotLink_t * pOperationLink,
void * pMatch );
/**
* @brief Common function for processing received Shadow responses.
*
* @param[in] type DELETE, GET, or UPDATE.
* @param[in] pMessage Received Shadow response (as an MQTT PUBLISH message).
*/
static void _commonOperationCallback( _shadowOperationType_t type,
IotMqttCallbackParam_t * pMessage );
/**
* @brief Invoked when a Shadow response is received for Shadow DELETE.
*
* @param[in] pArgument Ignored.
* @param[in] pMessage Received Shadow response (as an MQTT PUBLISH message).
*/
static void _deleteCallback( void * pArgument,
IotMqttCallbackParam_t * pMessage );
/**
* @brief Invoked when a Shadow response is received for a Shadow GET.
*
* @param[in] pArgument Ignored.
* @param[in] pMessage Received Shadow response (as an MQTT PUBLISH message).
*/
static void _getCallback( void * pArgument,
IotMqttCallbackParam_t * pMessage );
/**
* @brief Process an incoming Shadow document received when a Shadow GET is
* accepted.
*
* @param[in] pOperation The GET operation associated with the incoming Shadow
* document.
* @param[in] pPublishInfo The received Shadow document (as an MQTT PUBLISH
* message).
*
* @return #AWS_IOT_SHADOW_SUCCESS or #AWS_IOT_SHADOW_NO_MEMORY. Memory allocation
* only happens for a waitable `pOperation`.
*/
static AwsIotShadowError_t _processAcceptedGet( _shadowOperation_t * pOperation,
const IotMqttPublishInfo_t * pPublishInfo );
/**
* @brief Invoked when a Shadow response is received for a Shadow UPDATE.
*
* @param[in] pArgument Ignored.
* @param[in] pMessage Received Shadow response (as an MQTT PUBLISH message).
*/
static void _updateCallback( void * pArgument,
IotMqttCallbackParam_t * pMessage );
/**
* @brief Notify of a completed Shadow operation.
*
* @param[in] pOperation The operation which completed.
*
* Depending on the parameters passed to a user-facing Shadow function, the
* notification will cause @ref shadow_function_wait to return or invoke a
* user-provided callback.
*/
static void _notifyCompletion( _shadowOperation_t * pOperation );
/**
* @brief Get a Shadow subscription to use with a Shadow operation.
*
* This function may use an existing Shadow subscription, or it may allocate a
* new one.
*
* @param[in] pThingName Thing Name associated with operation.
* @param[in] thingNameLength Length of `pThingName`.
* @param[in] pTopicBuffer Contains the topic to use for subscribing.
* @param[in] operationTopicLength The length of the base topic in `pTopicBuffer`.
* @param[in] pOperation Shadow operation that needs a subscription.
* @param[out] pFreeTopicBuffer Whether the caller may free `pTopicBuffer`
* (which may be assigned to a subscription).
*
* @return #AWS_IOT_SHADOW_SUCCESS or #AWS_IOT_SHADOW_NO_MEMORY
*/
static AwsIotShadowError_t _findSubscription( const char * pThingName,
size_t thingNameLength,
char * pTopicBuffer,
uint16_t operationTopicLength,
_shadowOperation_t * pOperation,
bool * pFreeTopicBuffer );
/*-----------------------------------------------------------*/
#if LIBRARY_LOG_LEVEL > IOT_LOG_NONE
/**
* @brief Printable names for each of the Shadow operations.
*/
const char * const _pAwsIotShadowOperationNames[] =
{
"DELETE",
"GET",
"UPDATE",
"SET DELTA",
"SET UPDATED"
};
#endif /* if LIBRARY_LOG_LEVEL > IOT_LOG_NONE */
/**
* @brief List of active Shadow operations awaiting a response from the Shadow
* service.
*/
IotListDouble_t _AwsIotShadowPendingOperations = { 0 };
/**
* @brief Protects #_AwsIotShadowPendingOperations from concurrent access.
*/
IotMutex_t _AwsIotShadowPendingOperationsMutex;
/*-----------------------------------------------------------*/
static bool _shadowOperation_match( const IotLink_t * pOperationLink,
void * pMatch )
{
/* Because this function is called from a container function, the given link
* must never be NULL. */
AwsIotShadow_Assert( pOperationLink != NULL );
_shadowOperation_t * pOperation = IotLink_Container( _shadowOperation_t,
pOperationLink,
link );
_operationMatchParams_t * pParam = ( _operationMatchParams_t * ) pMatch;
_shadowSubscription_t * pSubscription = pOperation->pSubscription;
const char * pClientToken = NULL;
size_t clientTokenLength = 0;
/* Check for matching Thing Name and operation type. */
bool match = ( pOperation->type == pParam->type ) &&
( pParam->thingNameLength == pSubscription->thingNameLength ) &&
( strncmp( pParam->pThingName,
pSubscription->pThingName,
pParam->thingNameLength ) == 0 );
/* For a Shadow UPDATE operation, compare the client tokens. */
if( ( match == true ) && ( pOperation->type == SHADOW_UPDATE ) )
{
/* Check document pointers. */
AwsIotShadow_Assert( pParam->pDocument != NULL );
AwsIotShadow_Assert( pParam->documentLength > 0 );
AwsIotShadow_Assert( pOperation->u.update.pClientToken != NULL );
AwsIotShadow_Assert( pOperation->u.update.clientTokenLength > 0 );
IotLogDebug( "Verifying client tokens for Shadow UPDATE." );
/* Check for the client token in the UPDATE response document. */
match = AwsIot_GetClientToken( pParam->pDocument,
pParam->documentLength,
&pClientToken,
&clientTokenLength );
/* If the UPDATE response document has a client token, check that it
* matches. */
if( match == true )
{
match = ( clientTokenLength == pOperation->u.update.clientTokenLength ) &&
( strncmp( pClientToken,
pOperation->u.update.pClientToken,
clientTokenLength ) == 0 );
}
else
{
IotLogWarn( "Received a Shadow UPDATE response with no client token. "
"This is possibly a response to a bad JSON document:\r\n%.*s",
pParam->documentLength,
pParam->pDocument );
}
}
return match;
}
/*-----------------------------------------------------------*/
static void _commonOperationCallback( _shadowOperationType_t type,
IotMqttCallbackParam_t * pMessage )
{
_shadowOperation_t * pOperation = NULL;
IotLink_t * pOperationLink = NULL;
AwsIotStatus_t status = AWS_IOT_UNKNOWN;
_operationMatchParams_t param = { .type = ( _shadowOperationType_t ) 0 };
uint32_t flags = 0;
/* Set operation type to search. */
param.type = type;
/* Set the response document for a Shadow UPDATE. */
if( type == SHADOW_UPDATE )
{
param.pDocument = pMessage->u.message.info.pPayload;
param.documentLength = pMessage->u.message.info.payloadLength;
}
/* Parse the Thing Name from the MQTT topic name. */
if( AwsIot_ParseThingName( pMessage->u.message.info.pTopicName,
pMessage->u.message.info.topicNameLength,
&( param.pThingName ),
&( param.thingNameLength ) ) == false )
{
IOT_GOTO_CLEANUP();
}
/* Lock the pending operations list for exclusive access. */
IotMutex_Lock( &( _AwsIotShadowPendingOperationsMutex ) );
/* Search for a matching pending operation. */
pOperationLink = IotListDouble_FindFirstMatch( &( _AwsIotShadowPendingOperations ),
NULL,
_shadowOperation_match,
&param );
/* Find and remove the first Shadow operation of the given type. */
if( pOperationLink == NULL )
{
/* Operation is not pending. It may have already been processed. Return
* without doing anything */
IotMutex_Unlock( &( _AwsIotShadowPendingOperationsMutex ) );
IotLogWarn( "Shadow %s callback received an unknown operation.",
_pAwsIotShadowOperationNames[ type ] );
IOT_GOTO_CLEANUP();
}
else
{
pOperation = IotLink_Container( _shadowOperation_t, pOperationLink, link );
/* Remove a non-waitable operation from the pending operation list.
* Waitable operations are removed by the Wait function. */
if( ( pOperation->flags & AWS_IOT_SHADOW_FLAG_WAITABLE ) == 0 )
{
IotListDouble_Remove( &( pOperation->link ) );
IotMutex_Unlock( &( _AwsIotShadowPendingOperationsMutex ) );
}
}
/* Check that the Shadow operation type and status. */
AwsIotShadow_Assert( pOperation->type == type );
AwsIotShadow_Assert( pOperation->status == AWS_IOT_SHADOW_STATUS_PENDING );
IotLogDebug( "Received Shadow response on topic %.*s",
pMessage->u.message.info.topicNameLength,
pMessage->u.message.info.pTopicName );
/* Parse the status from the topic name. */
status = AwsIot_ParseStatus( pMessage->u.message.info.pTopicName,
pMessage->u.message.info.topicNameLength );
switch( status )
{
case AWS_IOT_ACCEPTED:
IotLogInfo( "Shadow %s of %.*s was ACCEPTED.",
_pAwsIotShadowOperationNames[ type ],
pOperation->pSubscription->thingNameLength,
pOperation->pSubscription->pThingName );
/* Process the retrieved document for a Shadow GET. Otherwise, set
* status to success. */
if( type == SHADOW_GET )
{
pOperation->status = _processAcceptedGet( pOperation,
&( pMessage->u.message.info ) );
}
else
{
pOperation->status = AWS_IOT_SHADOW_SUCCESS;
}
break;
case AWS_IOT_REJECTED:
IotLogWarn( "Shadow %s of %.*s was REJECTED.",
_pAwsIotShadowOperationNames[ type ],
pOperation->pSubscription->thingNameLength,
pOperation->pSubscription->pThingName );
pOperation->status = _AwsIotShadow_ParseErrorDocument( pMessage->u.message.info.pPayload,
pMessage->u.message.info.payloadLength );
break;
default:
IotLogWarn( "Unknown status for %s of %.*s Shadow. Ignoring message.",
_pAwsIotShadowOperationNames[ type ],
pOperation->pSubscription->thingNameLength,
pOperation->pSubscription->pThingName );
pOperation->status = AWS_IOT_SHADOW_BAD_RESPONSE;
break;
}
/* Copy the flags from the Shadow operation. The notify function may delete the operation. */
flags = pOperation->flags;
/* Notify of operation completion. */
_notifyCompletion( pOperation );
/* For waitable operations, unlock the pending operation list mutex to allow
* the Wait function to run. */
if( ( flags & AWS_IOT_SHADOW_FLAG_WAITABLE ) == AWS_IOT_SHADOW_FLAG_WAITABLE )
{
IotMutex_Unlock( &( _AwsIotShadowPendingOperationsMutex ) );
}
/* This function has no return value and no cleanup, but uses the cleanup
* label to exit on error. */
IOT_FUNCTION_CLEANUP_BEGIN();
}
/*-----------------------------------------------------------*/
static void _deleteCallback( void * pArgument,
IotMqttCallbackParam_t * pMessage )
{
/* Silence warnings about unused parameter. */
( void ) pArgument;
_commonOperationCallback( SHADOW_DELETE, pMessage );
}
/*-----------------------------------------------------------*/
static void _getCallback( void * pArgument,
IotMqttCallbackParam_t * pMessage )
{
/* Silence warnings about unused parameter. */
( void ) pArgument;
_commonOperationCallback( SHADOW_GET, pMessage );
}
/*-----------------------------------------------------------*/
static AwsIotShadowError_t _processAcceptedGet( _shadowOperation_t * pOperation,
const IotMqttPublishInfo_t * pPublishInfo )
{
AwsIotShadowError_t status = AWS_IOT_SHADOW_SUCCESS;
/* A non-waitable operation can re-use the pointers from the publish info,
* since those are guaranteed to be in-scope throughout the user callback.
* But a waitable operation must copy the data from the publish info because
* AwsIotShadow_Wait may be called after the MQTT library frees the publish
* info. */
if( ( pOperation->flags & AWS_IOT_SHADOW_FLAG_WAITABLE ) == 0 )
{
pOperation->u.get.pDocument = pPublishInfo->pPayload;
pOperation->u.get.documentLength = pPublishInfo->payloadLength;
}
else
{
IotLogDebug( "Allocating new buffer for waitable Shadow GET." );
/* Parameter validation should not have allowed a NULL malloc function. */
AwsIotShadow_Assert( pOperation->u.get.mallocDocument != NULL );
/* Allocate a buffer for the retrieved document. */
pOperation->u.get.pDocument = pOperation->u.get.mallocDocument( pPublishInfo->payloadLength );
if( pOperation->u.get.pDocument == NULL )
{
IotLogError( "Failed to allocate buffer for retrieved Shadow document." );
status = AWS_IOT_SHADOW_NO_MEMORY;
}
else
{
/* Copy the retrieved document. */
( void ) memcpy( ( void * ) pOperation->u.get.pDocument,
pPublishInfo->pPayload,
pPublishInfo->payloadLength );
pOperation->u.get.documentLength = pPublishInfo->payloadLength;
}
}
return status;
}
/*-----------------------------------------------------------*/
static void _updateCallback( void * pArgument,
IotMqttCallbackParam_t * pMessage )
{
/* Silence warnings about unused parameter. */
( void ) pArgument;
_commonOperationCallback( SHADOW_UPDATE, pMessage );
}
/*-----------------------------------------------------------*/
static void _notifyCompletion( _shadowOperation_t * pOperation )
{
AwsIotShadowCallbackParam_t callbackParam = { .callbackType = ( AwsIotShadowCallbackType_t ) 0 };
_shadowSubscription_t * pSubscription = pOperation->pSubscription,
* pRemovedSubscription = NULL;
/* If the operation is waiting, post to its wait semaphore and return. */
if( ( pOperation->flags & AWS_IOT_SHADOW_FLAG_WAITABLE ) == AWS_IOT_SHADOW_FLAG_WAITABLE )
{
IotSemaphore_Post( &( pOperation->notify.waitSemaphore ) );
}
else
{
/* Decrement the reference count. This also removes subscriptions if the
* count reaches 0. */
IotMutex_Lock( &_AwsIotShadowSubscriptionsMutex );
_AwsIotShadow_DecrementReferences( pOperation,
pSubscription->pTopicBuffer,
&pRemovedSubscription );
IotMutex_Unlock( &_AwsIotShadowSubscriptionsMutex );
/* Set the subscription pointer used for the user callback based on whether
* a subscription was removed from the list. */
if( pRemovedSubscription != NULL )
{
pSubscription = pRemovedSubscription;
}
AwsIotShadow_Assert( pSubscription != NULL );
/* Invoke the user callback if provided. */
if( pOperation->notify.callback.function != NULL )
{
/* Set the common members of the callback parameter. */
callbackParam.callbackType = ( AwsIotShadowCallbackType_t ) pOperation->type;
callbackParam.mqttConnection = pOperation->mqttConnection;
callbackParam.u.operation.result = pOperation->status;
callbackParam.u.operation.reference = pOperation;
callbackParam.pThingName = pSubscription->pThingName;
callbackParam.thingNameLength = pSubscription->thingNameLength;
/* Set the members of the callback parameter for a received document. */
if( pOperation->type == SHADOW_GET )
{
callbackParam.u.operation.get.pDocument = pOperation->u.get.pDocument;
callbackParam.u.operation.get.documentLength = pOperation->u.get.documentLength;
}
pOperation->notify.callback.function( pOperation->notify.callback.pCallbackContext,
&callbackParam );
}
/* Destroy a removed subscription. */
if( pRemovedSubscription != NULL )
{
_AwsIotShadow_DestroySubscription( pRemovedSubscription );
}
_AwsIotShadow_DestroyOperation( pOperation );
}
}
/*-----------------------------------------------------------*/
static AwsIotShadowError_t _findSubscription( const char * pThingName,
size_t thingNameLength,
char * pTopicBuffer,
uint16_t operationTopicLength,
_shadowOperation_t * pOperation,
bool * pFreeTopicBuffer )
{
AwsIotShadowError_t status = AWS_IOT_SHADOW_SUCCESS;
_shadowSubscription_t * pSubscription = NULL;
/* Lookup table for Shadow operation callbacks. */
const AwsIotMqttCallbackFunction_t shadowCallbacks[ SHADOW_OPERATION_COUNT ] =
{
_deleteCallback,
_getCallback,
_updateCallback
};
/* Lock the subscriptions mutex for exclusive access. */
IotMutex_Lock( &_AwsIotShadowSubscriptionsMutex );
/* Check for an existing subscription. This function will attempt to allocate
* a new subscription if not found. */
pSubscription = _AwsIotShadow_FindSubscription( pThingName,
thingNameLength,
true );
if( pSubscription == NULL )
{
status = AWS_IOT_SHADOW_NO_MEMORY;
}
else
{
/* Ensure that the subscription Thing Name matches. */
AwsIotShadow_Assert( pSubscription != NULL );
AwsIotShadow_Assert( pSubscription->thingNameLength == thingNameLength );
AwsIotShadow_Assert( strncmp( pSubscription->pThingName,
pThingName,
thingNameLength ) == 0 );
/* Set the subscription object for the Shadow operation. */
pOperation->pSubscription = pSubscription;
/* Assign the topic buffer to the subscription to use for unsubscribing if
* the subscription has no topic buffer. */
if( pSubscription->pTopicBuffer == NULL )
{
pSubscription->pTopicBuffer = pTopicBuffer;
/* Don't free the topic buffer if it was allocated to the subscription. */
*pFreeTopicBuffer = false;
}
else
{
*pFreeTopicBuffer = true;
}
/* Increment the reference count for this Shadow operation's
* subscriptions. */
status = _AwsIotShadow_IncrementReferences( pOperation,
pTopicBuffer,
operationTopicLength,
shadowCallbacks[ pOperation->type ] );
if( status != AWS_IOT_SHADOW_SUCCESS )
{
/* Failed to add subscriptions for a Shadow operation. The reference
* count was not incremented. Check if this subscription should be
* deleted. */
_AwsIotShadow_RemoveSubscription( pSubscription, NULL );
}
}
/* Unlock the Shadow subscription list mutex. */
IotMutex_Unlock( &_AwsIotShadowSubscriptionsMutex );
return status;
}
/*-----------------------------------------------------------*/
AwsIotShadowError_t _AwsIotShadow_CreateOperation( _shadowOperation_t ** pNewOperation,
_shadowOperationType_t type,
uint32_t flags,
const AwsIotShadowCallbackInfo_t * pCallbackInfo )
{
IOT_FUNCTION_ENTRY( AwsIotShadowError_t, AWS_IOT_SHADOW_SUCCESS );
_shadowOperation_t * pOperation = NULL;
IotLogDebug( "Creating operation record for Shadow %s.",
_pAwsIotShadowOperationNames[ type ] );
/* Allocate memory for a new Shadow operation. */
pOperation = AwsIotShadow_MallocOperation( sizeof( _shadowOperation_t ) );
if( pOperation == NULL )
{
IotLogError( "Failed to allocate memory for Shadow %s.",
_pAwsIotShadowOperationNames[ type ] );
IOT_SET_AND_GOTO_CLEANUP( AWS_IOT_SHADOW_NO_MEMORY );
}
/* Clear the operation data. */
( void ) memset( pOperation, 0x00, sizeof( _shadowOperation_t ) );
/* Check if the waitable flag is set. If it is, create a semaphore to
* wait on. */
if( ( flags & AWS_IOT_SHADOW_FLAG_WAITABLE ) == AWS_IOT_SHADOW_FLAG_WAITABLE )
{
if( IotSemaphore_Create( &( pOperation->notify.waitSemaphore ), 0, 1 ) == false )
{
IotLogError( "Failed to create semaphore for waitable Shadow %s.",
_pAwsIotShadowOperationNames[ type ] );
IOT_SET_AND_GOTO_CLEANUP( AWS_IOT_SHADOW_NO_MEMORY );
}
}
else
{
/* If the waitable flag isn't set but a callback is, copy the callback
* information. */
if( pCallbackInfo != NULL )
{
pOperation->notify.callback = *pCallbackInfo;
}
}
/* Set the remaining common members of the Shadow operation. */
pOperation->type = type;
pOperation->flags = flags;
pOperation->status = AWS_IOT_SHADOW_STATUS_PENDING;
IOT_FUNCTION_CLEANUP_BEGIN();
if( status != AWS_IOT_SHADOW_SUCCESS )
{
if( pOperation != NULL )
{
AwsIotShadow_FreeOperation( pOperation );
}
}
else
{
/* Set the output parameter. */
*pNewOperation = pOperation;
}
IOT_FUNCTION_CLEANUP_END();
}
/*-----------------------------------------------------------*/
void _AwsIotShadow_DestroyOperation( void * pData )
{
_shadowOperation_t * pOperation = ( _shadowOperation_t * ) pData;
/* The Shadow operation pointer must not be NULL. */
AwsIotShadow_Assert( pOperation != NULL );
IotLogDebug( "Destroying Shadow operation %s.",
_pAwsIotShadowOperationNames[ pOperation->type ] );
/* Check if a wait semaphore was created for this operation. */
if( ( pOperation->flags & AWS_IOT_SHADOW_FLAG_WAITABLE ) == AWS_IOT_SHADOW_FLAG_WAITABLE )
{
/* Destroy the wait semaphore */
IotSemaphore_Destroy( &( pOperation->notify.waitSemaphore ) );
}
/* If this is a Shadow update, free any allocated client token. */
if( ( pOperation->type == SHADOW_UPDATE ) &&
( pOperation->u.update.pClientToken != NULL ) )
{
AwsIotShadow_Assert( pOperation->u.update.clientTokenLength > 0 );
AwsIotShadow_FreeString( ( void * ) ( pOperation->u.update.pClientToken ) );
}
/* Free the memory used to hold operation data. */
AwsIotShadow_FreeOperation( pOperation );
}
/*-----------------------------------------------------------*/
AwsIotShadowError_t _AwsIotShadow_GenerateShadowTopic( _shadowOperationType_t type,
const char * pThingName,
size_t thingNameLength,
char ** pTopicBuffer,
uint16_t * pOperationTopicLength )
{
AwsIotShadowError_t status = AWS_IOT_SHADOW_SUCCESS;
AwsIotTopicInfo_t topicInfo = { 0 };
/* Lookup table for Shadow operation strings. */
const char * const pOperationString[ SHADOW_OPERATION_COUNT ] =
{
SHADOW_DELETE_OPERATION_STRING, /* Shadow delete operation. */
SHADOW_GET_OPERATION_STRING, /* Shadow get operation. */
SHADOW_UPDATE_OPERATION_STRING /* Shadow update operation. */
};
/* Lookup table for Shadow operation string lengths. */
const uint16_t pOperationStringLength[ SHADOW_OPERATION_COUNT ] =
{
SHADOW_DELETE_OPERATION_STRING_LENGTH, /* Shadow delete operation. */
SHADOW_GET_OPERATION_STRING_LENGTH, /* Shadow get operation. */
SHADOW_UPDATE_OPERATION_STRING_LENGTH /* Shadow update operation. */
};
/* Only Shadow delete, get, and update operation types should be passed to this
* function. */
AwsIotShadow_Assert( ( type == SHADOW_DELETE ) ||
( type == SHADOW_GET ) ||
( type == SHADOW_UPDATE ) );
/* Set the members needed to generate an operation topic. */
topicInfo.pThingName = pThingName;
topicInfo.thingNameLength = thingNameLength;
topicInfo.pOperationName = pOperationString[ type ];
topicInfo.operationNameLength = pOperationStringLength[ type ];
topicInfo.longestSuffixLength = SHADOW_LONGEST_SUFFIX_LENGTH;
topicInfo.mallocString = AwsIotShadow_MallocString;
if( AwsIot_GenerateOperationTopic( &topicInfo,
pTopicBuffer,
pOperationTopicLength ) == false )
{
status = AWS_IOT_SHADOW_NO_MEMORY;
}
return status;
}
/*-----------------------------------------------------------*/
AwsIotShadowError_t _AwsIotShadow_ProcessOperation( IotMqttConnection_t mqttConnection,
const char * pThingName,
size_t thingNameLength,
_shadowOperation_t * pOperation,
const AwsIotShadowDocumentInfo_t * pDocumentInfo )
{
IOT_FUNCTION_ENTRY( AwsIotShadowError_t, AWS_IOT_SHADOW_STATUS_PENDING );
IotMqttError_t publishStatus = IOT_MQTT_STATUS_PENDING;
char * pTopicBuffer = NULL;
uint16_t operationTopicLength = 0;
bool freeTopicBuffer = true;
IotMqttPublishInfo_t publishInfo = IOT_MQTT_PUBLISH_INFO_INITIALIZER;
IotLogDebug( "Processing Shadow operation %s for Thing %.*s.",
_pAwsIotShadowOperationNames[ pOperation->type ],
thingNameLength,
pThingName );
/* Set the operation's MQTT connection. */
pOperation->mqttConnection = mqttConnection;
/* Generate the operation topic buffer. */
status = _AwsIotShadow_GenerateShadowTopic( pOperation->type,
pThingName,
thingNameLength,
&pTopicBuffer,
&operationTopicLength );
if( status != AWS_IOT_SHADOW_SUCCESS )
{
IotLogError( "No memory for Shadow operation topic buffer." );
IOT_SET_AND_GOTO_CLEANUP( AWS_IOT_SHADOW_NO_MEMORY );
}
/* Get a subscription object for this Shadow operation. */
status = _findSubscription( pThingName,
thingNameLength,
pTopicBuffer,
operationTopicLength,
pOperation,
&freeTopicBuffer );
if( status != AWS_IOT_SHADOW_SUCCESS )
{
/* No subscription was found and no subscription could be allocated. */
IOT_GOTO_CLEANUP();
}
/* Set the operation topic name. */
publishInfo.pTopicName = pTopicBuffer;
publishInfo.topicNameLength = operationTopicLength;
IotLogDebug( "Shadow %s message will be published to topic %.*s",
_pAwsIotShadowOperationNames[ pOperation->type ],
publishInfo.topicNameLength,
publishInfo.pTopicName );
/* Set the document info if this operation is not a Shadow DELETE. */
if( pOperation->type != SHADOW_DELETE )
{
publishInfo.qos = pDocumentInfo->qos;
publishInfo.retryLimit = pDocumentInfo->retryLimit;
publishInfo.retryMs = pDocumentInfo->retryMs;
IotLogDebug( "Shadow %s message will be published at QoS %d with "
"retryLimit %d and retryMs %llu.",
_pAwsIotShadowOperationNames[ pOperation->type ],
publishInfo.qos,
publishInfo.retryLimit,
publishInfo.retryMs );
}
/* Set the PUBLISH payload to the update document for Shadow UPDATE. */
if( pOperation->type == SHADOW_UPDATE )
{
publishInfo.pPayload = pDocumentInfo->u.update.pUpdateDocument;
publishInfo.payloadLength = pDocumentInfo->u.update.updateDocumentLength;
}
/* Set the PUBLISH payload to an empty string for Shadow DELETE and GET,
* per the Shadow spec. */
else
{
publishInfo.pPayload = "";
publishInfo.payloadLength = 0;
}
/* Add Shadow operation to the pending operations list. */
IotMutex_Lock( &( _AwsIotShadowPendingOperationsMutex ) );
IotListDouble_InsertHead( &( _AwsIotShadowPendingOperations ),
&( pOperation->link ) );
IotMutex_Unlock( &( _AwsIotShadowPendingOperationsMutex ) );
/* Publish to the Shadow topic name. */
publishStatus = IotMqtt_PublishSync( pOperation->mqttConnection,
&publishInfo,
0,
_AwsIotShadowMqttTimeoutMs );
/* Check for errors from the MQTT publish. */
if( publishStatus != IOT_MQTT_SUCCESS )
{
IotLogError( "Failed to publish MQTT message to %s %.*s Shadow, error %s.",
_pAwsIotShadowOperationNames[ pOperation->type ],
thingNameLength,
pThingName,
IotMqtt_strerror( publishStatus ) );
/* Convert the MQTT "NO MEMORY" error to a Shadow "NO MEMORY" error. */
status = SHADOW_CONVERT_STATUS_CODE_MQTT_TO_SHADOW( publishStatus );
/* If the "keep subscriptions" flag is not set, decrement the reference
* count. */
if( ( pOperation->flags & AWS_IOT_SHADOW_FLAG_KEEP_SUBSCRIPTIONS ) == 0 )
{
IotMutex_Lock( &_AwsIotShadowSubscriptionsMutex );
_AwsIotShadow_DecrementReferences( pOperation,
pTopicBuffer,
NULL );
IotMutex_Unlock( &_AwsIotShadowSubscriptionsMutex );
}
/* Remove Shadow operation from the pending operations list. */
IotMutex_Lock( &( _AwsIotShadowPendingOperationsMutex ) );
IotListDouble_Remove( &( pOperation->link ) );
IotMutex_Unlock( &( _AwsIotShadowPendingOperationsMutex ) );
}
else
{
IotLogDebug( "Shadow %s PUBLISH message successfully sent.",
_pAwsIotShadowOperationNames[ pOperation->type ] );
}
IOT_FUNCTION_CLEANUP_BEGIN();
/* Free the topic buffer used by this function if it was not assigned to a
* subscription. */
if( ( freeTopicBuffer == true ) && ( pTopicBuffer != NULL ) )
{
AwsIotShadow_FreeString( pTopicBuffer );
}
/* Destroy the Shadow operation on failure. */
if( status != AWS_IOT_SHADOW_SUCCESS )
{
_AwsIotShadow_DestroyOperation( pOperation );
}
else
{
/* Convert successful return code to "status pending", as the Shadow
* library is now waiting for a response from the service. */
status = AWS_IOT_SHADOW_STATUS_PENDING;
}
IOT_FUNCTION_CLEANUP_END();
}
/*-----------------------------------------------------------*/