blob: 882939942a9e4450bfd1928a423161cb7de1e3df [file] [log] [blame]
/*
* AWS IoT Shadow V2.1.0
* Copyright (C) 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Permission is hereby granted, free of charge, to any person obtaining a copy of
* this software and associated documentation files (the "Software"), to deal in
* the Software without restriction, including without limitation the rights to
* use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
* the Software, and to permit persons to whom the Software is furnished to do so,
* subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
* FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
* IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
/**
* @file aws_iot_shadow_subscription.c
* @brief Implements functions for interacting with the Shadow library's
* subscription list.
*/
/* The config header is always included first. */
#include "iot_config.h"
/* Standard includes. */
#include <string.h>
/* Shadow internal include. */
#include "private/aws_iot_shadow_internal.h"
/* Error handling include. */
#include "iot_error.h"
/* Platform layer includes. */
#include "platform/iot_threads.h"
/* MQTT include. */
#include "iot_mqtt.h"
/*-----------------------------------------------------------*/
/**
* @brief Match two #_shadowSubscription_t by Thing Name.
*
* @param[in] pSubscriptionLink Pointer to the link member of a #_shadowSubscription_t
* containing the Thing Name to check.
* @param[in] pMatch Pointer to an `AwsIotThingName_t`.
*
* @return `true` if the Thing Names match; `false` otherwise.
*/
static bool _shadowSubscription_match( const IotLink_t * pSubscriptionLink,
void * pMatch );
/*-----------------------------------------------------------*/
/**
* @brief List of active Shadow subscriptions objects.
*/
IotListDouble_t _AwsIotShadowSubscriptions = { 0 };
/**
* @brief Protects #_AwsIotShadowSubscriptions from concurrent access.
*/
IotMutex_t _AwsIotShadowSubscriptionsMutex;
/*-----------------------------------------------------------*/
static bool _shadowSubscription_match( const IotLink_t * pSubscriptionLink,
void * pMatch )
{
bool match = false;
/* Because this function is called from a container function, the given link
* must never be NULL. */
AwsIotShadow_Assert( pSubscriptionLink != NULL );
const _shadowSubscription_t * pSubscription = IotLink_Container( _shadowSubscription_t,
pSubscriptionLink,
link );
const AwsIotThingName_t * pThingName = ( AwsIotThingName_t * ) pMatch;
if( pThingName->thingNameLength == pSubscription->thingNameLength )
{
/* Check for matching Thing Names. */
match = ( strncmp( pThingName->pThingName,
pSubscription->pThingName,
pThingName->thingNameLength ) == 0 );
}
return match;
}
/*-----------------------------------------------------------*/
_shadowSubscription_t * _AwsIotShadow_FindSubscription( const char * pThingName,
size_t thingNameLength,
bool createIfNotFound )
{
_shadowSubscription_t * pSubscription = NULL;
IotLink_t * pSubscriptionLink = NULL;
AwsIotThingName_t thingName = { 0 };
thingName.pThingName = pThingName;
thingName.thingNameLength = thingNameLength;
/* Search the list for an existing subscription for Thing Name. */
pSubscriptionLink = IotListDouble_FindFirstMatch( &( _AwsIotShadowSubscriptions ),
NULL,
_shadowSubscription_match,
&thingName );
/* Check if a subscription was found. */
if( pSubscriptionLink == NULL )
{
if( createIfNotFound == true )
{
/* No subscription found. Allocate a new subscription. */
pSubscription = AwsIotShadow_MallocSubscription( sizeof( _shadowSubscription_t ) + thingNameLength );
if( pSubscription != NULL )
{
/* Clear the new subscription. */
( void ) memset( pSubscription, 0x00, sizeof( _shadowSubscription_t ) + thingNameLength );
/* Set the Thing Name length and copy the Thing Name into the new subscription. */
pSubscription->thingNameLength = thingNameLength;
( void ) memcpy( pSubscription->pThingName, pThingName, thingNameLength );
/* Add the new subscription to the subscription list. */
IotListDouble_InsertHead( &( _AwsIotShadowSubscriptions ),
&( pSubscription->link ) );
IotLogDebug( "Created new Shadow subscriptions object for %.*s.",
thingNameLength,
pThingName );
}
else
{
IotLogError( "Failed to allocate memory for %.*s Shadow subscriptions.",
thingNameLength,
pThingName );
}
}
}
else
{
IotLogDebug( "Found existing Shadow subscriptions object for %.*s.",
thingNameLength,
pThingName );
pSubscription = IotLink_Container( _shadowSubscription_t, pSubscriptionLink, link );
}
return pSubscription;
}
/*-----------------------------------------------------------*/
void _AwsIotShadow_RemoveSubscription( _shadowSubscription_t * pSubscription,
_shadowSubscription_t ** pRemovedSubscription )
{
int32_t i = 0;
bool removeSubscription = true;
IotLogDebug( "Checking if subscription object for %.*s can be removed.",
pSubscription->thingNameLength,
pSubscription->pThingName );
/* Check for active operations. If any Shadow operation's subscription
* reference count is not 0, then the subscription cannot be removed. */
for( i = 0; i < SHADOW_OPERATION_COUNT; i++ )
{
if( pSubscription->references[ i ] > 0 )
{
IotLogDebug( "Reference count %ld for %.*s subscription object. "
"Subscription cannot be removed yet.",
( long int ) pSubscription->references[ i ],
pSubscription->thingNameLength,
pSubscription->pThingName );
removeSubscription = false;
}
else if( pSubscription->references[ i ] == AWS_IOT_PERSISTENT_SUBSCRIPTION )
{
IotLogDebug( "Subscription object for %.*s has persistent subscriptions. "
"Subscription will not be removed.",
pSubscription->thingNameLength,
pSubscription->pThingName );
removeSubscription = false;
}
if( removeSubscription == false )
{
break;
}
}
/* Check for active subscriptions. If any Shadow callbacks are active, then the
* subscription cannot be removed. */
if( removeSubscription == true )
{
for( i = 0; i < SHADOW_CALLBACK_COUNT; i++ )
{
if( pSubscription->callbacks[ i ].function != NULL )
{
IotLogDebug( "Found active Shadow %s callback for %.*s subscription object. "
"Subscription cannot be removed yet.",
_pAwsIotShadowCallbackNames[ i ],
pSubscription->thingNameLength,
pSubscription->pThingName );
removeSubscription = false;
break;
}
}
}
/* Remove the subscription if unused. */
if( removeSubscription == true )
{
/* No Shadow operation subscription references or active Shadow callbacks.
* Remove the subscription object. */
IotListDouble_Remove( &( pSubscription->link ) );
IotLogDebug( "Removed subscription object for %.*s.",
pSubscription->thingNameLength,
pSubscription->pThingName );
/* If the caller requested the removed subscription, set the output parameter.
* Otherwise, free the memory used by the subscription. */
if( pRemovedSubscription != NULL )
{
*pRemovedSubscription = pSubscription;
}
else
{
_AwsIotShadow_DestroySubscription( pSubscription );
}
}
}
/*-----------------------------------------------------------*/
void _AwsIotShadow_DestroySubscription( void * pData )
{
_shadowSubscription_t * pSubscription = ( _shadowSubscription_t * ) pData;
/* Free the topic buffer if allocated. */
if( pSubscription->pTopicBuffer != NULL )
{
AwsIotShadow_FreeString( pSubscription->pTopicBuffer );
}
/* Free memory used by subscription. */
AwsIotShadow_FreeSubscription( pSubscription );
}
/*-----------------------------------------------------------*/
AwsIotShadowError_t _AwsIotShadow_IncrementReferences( _shadowOperation_t * pOperation,
char * pTopicBuffer,
uint16_t operationTopicLength,
AwsIotMqttCallbackFunction_t callback )
{
IOT_FUNCTION_ENTRY( AwsIotShadowError_t, AWS_IOT_SHADOW_SUCCESS );
const _shadowOperationType_t type = pOperation->type;
_shadowSubscription_t * pSubscription = pOperation->pSubscription;
IotMqttError_t subscriptionStatus = IOT_MQTT_STATUS_PENDING;
AwsIotSubscriptionInfo_t subscriptionInfo = { 0 };
/* Do nothing if this operation has persistent subscriptions. */
if( pSubscription->references[ type ] == AWS_IOT_PERSISTENT_SUBSCRIPTION )
{
IotLogDebug( "Shadow %s for %.*s has persistent subscriptions. Reference "
"count will not be incremented.",
_pAwsIotShadowOperationNames[ type ],
pSubscription->thingNameLength,
pSubscription->pThingName );
IOT_GOTO_CLEANUP();
}
/* When persistent subscriptions are not present, the reference count must
* not be negative. */
AwsIotShadow_Assert( pSubscription->references[ type ] >= 0 );
/* Check if there are any existing references for this operation. */
if( pSubscription->references[ type ] == 0 )
{
/* Set the parameters needed to add subscriptions. */
subscriptionInfo.mqttConnection = pOperation->mqttConnection;
subscriptionInfo.callbackFunction = callback;
subscriptionInfo.timeout = _AwsIotShadowMqttTimeoutMs;
subscriptionInfo.pTopicFilterBase = pTopicBuffer;
subscriptionInfo.topicFilterBaseLength = operationTopicLength;
subscriptionStatus = AwsIot_ModifySubscriptions( IotMqtt_SubscribeSync,
&subscriptionInfo );
/* Convert MQTT return code to Shadow return code. */
status = SHADOW_CONVERT_STATUS_CODE_MQTT_TO_SHADOW( subscriptionStatus );
if( status != AWS_IOT_SHADOW_SUCCESS )
{
IOT_GOTO_CLEANUP();
}
}
/* Increment the number of subscription references for this operation when
* the keep subscriptions flag is not set. */
if( ( pOperation->flags & AWS_IOT_SHADOW_FLAG_KEEP_SUBSCRIPTIONS ) == 0 )
{
( pSubscription->references[ type ] )++;
IotLogDebug( "Shadow %s subscriptions for %.*s now has count %d.",
_pAwsIotShadowOperationNames[ type ],
pSubscription->thingNameLength,
pSubscription->pThingName,
pSubscription->references[ type ] );
}
/* Otherwise, set the persistent subscriptions flag. */
else
{
pSubscription->references[ type ] = AWS_IOT_PERSISTENT_SUBSCRIPTION;
IotLogDebug( "Set persistent subscriptions flag for Shadow %s of %.*s.",
_pAwsIotShadowOperationNames[ type ],
pSubscription->thingNameLength,
pSubscription->pThingName );
}
IOT_FUNCTION_EXIT_NO_CLEANUP();
}
/*-----------------------------------------------------------*/
void _AwsIotShadow_DecrementReferences( _shadowOperation_t * pOperation,
char * pTopicBuffer,
_shadowSubscription_t ** pRemovedSubscription )
{
const _shadowOperationType_t type = pOperation->type;
_shadowSubscription_t * pSubscription = pOperation->pSubscription;
uint16_t operationTopicLength = 0;
AwsIotSubscriptionInfo_t subscriptionInfo = { 0 };
/* Do nothing if this Shadow operation has persistent subscriptions. */
if( pSubscription->references[ type ] != AWS_IOT_PERSISTENT_SUBSCRIPTION )
{
/* Decrement the number of subscription references for this operation.
* Ensure that it's positive. */
( pSubscription->references[ type ] )--;
AwsIotShadow_Assert( pSubscription->references[ type ] >= 0 );
/* Check if the number of references has reached 0. */
if( pSubscription->references[ type ] == 0 )
{
IotLogDebug( "Reference count for %.*s %s is 0. Unsubscribing.",
pSubscription->thingNameLength,
pSubscription->pThingName,
_pAwsIotShadowOperationNames[ type ] );
/* Subscription must have a topic buffer. */
AwsIotShadow_Assert( pSubscription->pTopicBuffer != NULL );
/* Generate the prefix of the Shadow topic. This function will not
* fail when given a buffer. */
( void ) _AwsIotShadow_GenerateShadowTopic( ( _shadowOperationType_t ) type,
pSubscription->pThingName,
pSubscription->thingNameLength,
&( pSubscription->pTopicBuffer ),
&operationTopicLength );
/* Set the parameters needed to remove subscriptions. */
subscriptionInfo.mqttConnection = pOperation->mqttConnection;
subscriptionInfo.timeout = _AwsIotShadowMqttTimeoutMs;
subscriptionInfo.pTopicFilterBase = pTopicBuffer;
subscriptionInfo.topicFilterBaseLength = operationTopicLength;
( void ) AwsIot_ModifySubscriptions( IotMqtt_UnsubscribeSync,
&subscriptionInfo );
}
/* Check if this subscription should be deleted. */
_AwsIotShadow_RemoveSubscription( pSubscription,
pRemovedSubscription );
}
else
{
IotLogDebug( "Shadow %s for %.*s has persistent subscriptions. Reference "
"count will not be decremented.",
_pAwsIotShadowOperationNames[ type ],
pSubscription->thingNameLength,
pSubscription->pThingName );
}
}
/*-----------------------------------------------------------*/
AwsIotShadowError_t AwsIotShadow_RemovePersistentSubscriptions( IotMqttConnection_t mqttConnection,
const char * pThingName,
size_t thingNameLength,
uint32_t flags )
{
int32_t i = 0;
uint16_t operationTopicLength = 0;
AwsIotShadowError_t status = AWS_IOT_SHADOW_STATUS_PENDING;
IotMqttError_t unsubscribeStatus = IOT_MQTT_STATUS_PENDING;
AwsIotSubscriptionInfo_t subscriptionInfo = { 0 };
_shadowSubscription_t * pSubscription = NULL;
IotLink_t * pSubscriptionLink = NULL;
AwsIotThingName_t thingName = { 0 };
thingName.pThingName = pThingName;
thingName.thingNameLength = thingNameLength;
IotLogInfo( "Removing persistent subscriptions for %.*s.",
thingNameLength,
pThingName );
IotMutex_Lock( &( _AwsIotShadowSubscriptionsMutex ) );
/* Search the list for an existing subscription for Thing Name. */
pSubscriptionLink = IotListDouble_FindFirstMatch( &( _AwsIotShadowSubscriptions ),
NULL,
_shadowSubscription_match,
&thingName );
/* Unsubscribe from operation subscriptions if found. */
if( pSubscriptionLink != NULL )
{
IotLogDebug( "Found subscription object for %.*s. Checking for persistent "
"subscriptions to remove.",
thingNameLength,
pThingName );
pSubscription = IotLink_Container( _shadowSubscription_t, pSubscriptionLink, link );
for( i = 0; i < SHADOW_OPERATION_COUNT; i++ )
{
if( ( flags & ( 0x1UL << i ) ) != 0 )
{
IotLogDebug( "Removing %.*s %s persistent subscriptions.",
thingNameLength,
pThingName,
_pAwsIotShadowOperationNames[ i ] );
/* Subscription must have a topic buffer. */
AwsIotShadow_Assert( pSubscription->pTopicBuffer != NULL );
if( pSubscription->references[ i ] == AWS_IOT_PERSISTENT_SUBSCRIPTION )
{
/* Generate the prefix of the Shadow topic. This function will not
* fail when given a buffer. */
( void ) _AwsIotShadow_GenerateShadowTopic( ( _shadowOperationType_t ) i,
pThingName,
thingNameLength,
&( pSubscription->pTopicBuffer ),
&operationTopicLength );
/* Set the parameters needed to remove subscriptions. */
subscriptionInfo.mqttConnection = mqttConnection;
subscriptionInfo.timeout = _AwsIotShadowMqttTimeoutMs;
subscriptionInfo.pTopicFilterBase = pSubscription->pTopicBuffer;
subscriptionInfo.topicFilterBaseLength = operationTopicLength;
unsubscribeStatus = AwsIot_ModifySubscriptions( IotMqtt_UnsubscribeSync,
&subscriptionInfo );
/* Convert MQTT return code to Shadow return code. */
status = SHADOW_CONVERT_STATUS_CODE_MQTT_TO_SHADOW( unsubscribeStatus );
if( status != AWS_IOT_SHADOW_SUCCESS )
{
break;
}
/* Clear the persistent subscriptions flag and check if the
* subscription can be removed. */
pSubscription->references[ i ] = 0;
_AwsIotShadow_RemoveSubscription( pSubscription, NULL );
}
else
{
IotLogDebug( "%.*s %s does not have persistent subscriptions.",
thingNameLength,
pThingName,
_pAwsIotShadowOperationNames[ i ] );
}
}
}
}
else
{
IotLogWarn( "No subscription object found for %.*s",
thingNameLength,
pThingName );
}
IotMutex_Unlock( &( _AwsIotShadowSubscriptionsMutex ) );
return status;
}
/*-----------------------------------------------------------*/