blob: aa944cf7eb4271e5140686e8edcf51a9770ed9bf [file] [log] [blame]
/*
* AWS IoT Jobs V1.0.0
* Copyright (C) 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Permission is hereby granted, free of charge, to any person obtaining a copy of
* this software and associated documentation files (the "Software"), to deal in
* the Software without restriction, including without limitation the rights to
* use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
* the Software, and to permit persons to whom the Software is furnished to do so,
* subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
* FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
* IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
/**
* @file aws_iot_jobs_subscription.c
* @brief Implements functions for interacting with the Jobs library's
* subscription list.
*/
/* The config header is always included first. */
#include "iot_config.h"
/* Standard includes. */
#include <string.h>
/* Jobs internal include. */
#include "private/aws_iot_jobs_internal.h"
/* Error handling include. */
#include "iot_error.h"
/* Platform layer includes. */
#include "platform/iot_threads.h"
/* MQTT include. */
#include "iot_mqtt.h"
/*-----------------------------------------------------------*/
/**
* @brief Match two #_jobsSubscription_t by Thing Name.
*
* @param[in] pSubscriptionLink Pointer to the link member of a #_jobsSubscription_t
* containing the Thing Name to check.
* @param[in] pMatch Pointer to a `AwsIotThingName_t`.
*
* @return `true` if the Thing Names match; `false` otherwise.
*/
static bool _jobsSubscription_match( const IotLink_t * pSubscriptionLink,
void * pMatch );
/*-----------------------------------------------------------*/
/**
* @brief List of active Jobs subscriptions objects.
*/
IotListDouble_t _AwsIotJobsSubscriptions = { 0 };
/**
* @brief Protects #_AwsIotJobsSubscriptions from concurrent access.
*/
IotMutex_t _AwsIotJobsSubscriptionsMutex;
/*-----------------------------------------------------------*/
static bool _jobsSubscription_match( const IotLink_t * pSubscriptionLink,
void * pMatch )
{
bool match = false;
/* Because this function is called from a container function, the given link
* must never be NULL. */
AwsIotJobs_Assert( pSubscriptionLink != NULL );
const _jobsSubscription_t * pSubscription = IotLink_Container( _jobsSubscription_t,
pSubscriptionLink,
link );
const AwsIotThingName_t * pThingName = ( AwsIotThingName_t * ) pMatch;
if( pThingName->thingNameLength == pSubscription->thingNameLength )
{
/* Check for matching Thing Names. */
match = ( strncmp( pThingName->pThingName,
pSubscription->pThingName,
pThingName->thingNameLength ) == 0 );
}
return match;
}
/*-----------------------------------------------------------*/
_jobsSubscription_t * _AwsIotJobs_FindSubscription( const char * pThingName,
size_t thingNameLength,
bool createIfNotFound )
{
_jobsSubscription_t * pSubscription = NULL;
IotLink_t * pSubscriptionLink = NULL;
AwsIotThingName_t thingName = { 0 };
thingName.pThingName = pThingName;
thingName.thingNameLength = thingNameLength;
/* Search the list for an existing subscription for Thing Name. */
pSubscriptionLink = IotListDouble_FindFirstMatch( &( _AwsIotJobsSubscriptions ),
NULL,
_jobsSubscription_match,
&thingName );
/* Check if a subscription was found. */
if( pSubscriptionLink == NULL )
{
if( createIfNotFound == true )
{
/* No subscription found. Allocate a new subscription. */
pSubscription = AwsIotJobs_MallocSubscription( sizeof( _jobsSubscription_t ) + thingNameLength );
if( pSubscription != NULL )
{
/* Clear the new subscription. */
( void ) memset( pSubscription, 0x00, sizeof( _jobsSubscription_t ) + thingNameLength );
/* Set the Thing Name length and copy the Thing Name into the new subscription. */
pSubscription->thingNameLength = thingNameLength;
( void ) memcpy( pSubscription->pThingName, pThingName, thingNameLength );
/* Add the new subscription to the subscription list. */
IotListDouble_InsertHead( &( _AwsIotJobsSubscriptions ),
&( pSubscription->link ) );
IotLogDebug( "Created new Jobs subscriptions object for %.*s.",
thingNameLength,
pThingName );
}
else
{
IotLogError( "Failed to allocate memory for %.*s Jobs subscriptions.",
thingNameLength,
pThingName );
}
}
}
else
{
IotLogDebug( "Found existing Jobs subscriptions object for %.*s.",
thingNameLength,
pThingName );
pSubscription = IotLink_Container( _jobsSubscription_t, pSubscriptionLink, link );
}
return pSubscription;
}
/*-----------------------------------------------------------*/
void _AwsIotJobs_RemoveSubscription( _jobsSubscription_t * pSubscription,
_jobsSubscription_t ** pRemovedSubscription )
{
IOT_FUNCTION_ENTRY( bool, true );
int32_t i = 0, callbackIndex = 0;
IotLogDebug( "Checking if subscription object for %.*s can be removed.",
pSubscription->thingNameLength,
pSubscription->pThingName );
/* Check for active operations. If any Jobs operation's subscription
* reference count is not 0, then the subscription cannot be removed. */
for( i = 0; i < JOBS_OPERATION_COUNT; i++ )
{
if( pSubscription->operationReferences[ i ] > 0 )
{
IotLogDebug( "Reference count %ld for %.*s subscription object. "
"Subscription cannot be removed yet.",
( long int ) pSubscription->operationReferences[ i ],
pSubscription->thingNameLength,
pSubscription->pThingName );
IOT_SET_AND_GOTO_CLEANUP( false );
}
else if( pSubscription->operationReferences[ i ] == AWS_IOT_PERSISTENT_SUBSCRIPTION )
{
IotLogDebug( "Subscription object for %.*s has persistent subscriptions. "
"Subscription will not be removed.",
pSubscription->thingNameLength,
pSubscription->pThingName );
IOT_SET_AND_GOTO_CLEANUP( false );
}
}
/* Check for active subscriptions. If any Jobs callbacks are active, then the
* subscription cannot be removed. */
if( pSubscription->callbackReferences > 0 )
{
IotLogDebug( "Notify callbacks are using %.*s subscription object. "
"Subscription cannot be removed yet.",
pSubscription->thingNameLength,
pSubscription->pThingName );
IOT_SET_AND_GOTO_CLEANUP( false );
}
for( i = 0; i < JOBS_CALLBACK_COUNT; i++ )
{
for( callbackIndex = 0; callbackIndex < AWS_IOT_JOBS_NOTIFY_CALLBACKS; callbackIndex++ )
{
if( pSubscription->callbacks[ i ][ callbackIndex ].function != NULL )
{
IotLogDebug( "Found active Jobs %s callback for %.*s subscription object. "
"Subscription cannot be removed yet.",
_pAwsIotJobsCallbackNames[ i ],
pSubscription->thingNameLength,
pSubscription->pThingName );
IOT_SET_AND_GOTO_CLEANUP( false );
}
}
}
/* Remove the subscription if unused. */
IOT_FUNCTION_CLEANUP_BEGIN();
if( status == true )
{
/* No Jobs operation subscription references or active Jobs callbacks.
* Remove the subscription object. */
IotListDouble_Remove( &( pSubscription->link ) );
IotLogDebug( "Removed subscription object for %.*s.",
pSubscription->thingNameLength,
pSubscription->pThingName );
/* If the caller requested the removed subscription, set the output parameter.
* Otherwise, free the memory used by the subscription. */
if( pRemovedSubscription != NULL )
{
*pRemovedSubscription = pSubscription;
}
else
{
_AwsIotJobs_DestroySubscription( pSubscription );
}
}
}
/*-----------------------------------------------------------*/
void _AwsIotJobs_DestroySubscription( void * pData )
{
_jobsSubscription_t * pSubscription = ( _jobsSubscription_t * ) pData;
/* Free the topic buffer if allocated. */
if( pSubscription->pTopicBuffer != NULL )
{
AwsIotJobs_FreeString( pSubscription->pTopicBuffer );
}
/* Free memory used by subscription. */
AwsIotJobs_FreeSubscription( pSubscription );
}
/*-----------------------------------------------------------*/
AwsIotJobsError_t _AwsIotJobs_IncrementReferences( _jobsOperation_t * pOperation,
char * pTopicBuffer,
uint16_t operationTopicLength,
AwsIotMqttCallbackFunction_t callback )
{
IOT_FUNCTION_ENTRY( AwsIotJobsError_t, AWS_IOT_JOBS_SUCCESS );
const _jobsOperationType_t type = pOperation->type;
_jobsSubscription_t * pSubscription = pOperation->pSubscription;
IotMqttError_t subscriptionStatus = IOT_MQTT_STATUS_PENDING;
AwsIotSubscriptionInfo_t subscriptionInfo = { 0 };
/* Do nothing if this operation has persistent subscriptions. */
if( pSubscription->operationReferences[ type ] == AWS_IOT_PERSISTENT_SUBSCRIPTION )
{
IotLogDebug( "Jobs %s for %.*s has persistent subscriptions. Reference "
"count will not be incremented.",
_pAwsIotJobsOperationNames[ type ],
pSubscription->thingNameLength,
pSubscription->pThingName );
IOT_GOTO_CLEANUP();
}
/* When persistent subscriptions are not present, the reference count must
* not be negative. */
AwsIotJobs_Assert( pSubscription->operationReferences[ type ] >= 0 );
/* Check if there are any existing references for this operation. */
if( pSubscription->operationReferences[ type ] == 0 )
{
/* Set the parameters needed to add subscriptions. */
subscriptionInfo.mqttConnection = pOperation->mqttConnection;
subscriptionInfo.callbackFunction = callback;
subscriptionInfo.timeout = _AwsIotJobsMqttTimeoutMs;
subscriptionInfo.pTopicFilterBase = pTopicBuffer;
subscriptionInfo.topicFilterBaseLength = operationTopicLength;
subscriptionStatus = AwsIot_ModifySubscriptions( IotMqtt_SubscribeSync,
&subscriptionInfo );
/* Convert MQTT return code to Jobs return code. */
switch( subscriptionStatus )
{
case IOT_MQTT_SUCCESS:
status = AWS_IOT_JOBS_SUCCESS;
break;
case IOT_MQTT_NO_MEMORY:
status = AWS_IOT_JOBS_NO_MEMORY;
break;
default:
status = AWS_IOT_JOBS_MQTT_ERROR;
break;
}
if( status != AWS_IOT_JOBS_SUCCESS )
{
IOT_GOTO_CLEANUP();
}
}
/* Increment the number of subscription references for this operation when
* the keep subscriptions flag is not set. */
if( ( pOperation->flags & AWS_IOT_JOBS_FLAG_KEEP_SUBSCRIPTIONS ) == 0 )
{
( pSubscription->operationReferences[ type ] )++;
IotLogDebug( "Jobs %s subscriptions for %.*s now has count %d.",
_pAwsIotJobsOperationNames[ type ],
pSubscription->thingNameLength,
pSubscription->pThingName,
pSubscription->operationReferences[ type ] );
}
/* Otherwise, set the persistent subscriptions flag. */
else
{
pSubscription->operationReferences[ type ] = AWS_IOT_PERSISTENT_SUBSCRIPTION;
IotLogDebug( "Set persistent subscriptions flag for Jobs %s of %.*s.",
_pAwsIotJobsOperationNames[ type ],
pSubscription->thingNameLength,
pSubscription->pThingName );
}
IOT_FUNCTION_EXIT_NO_CLEANUP();
}
/*-----------------------------------------------------------*/
void _AwsIotJobs_DecrementReferences( _jobsOperation_t * pOperation,
char * pTopicBuffer,
_jobsSubscription_t ** pRemovedSubscription )
{
const _jobsOperationType_t type = pOperation->type;
_jobsSubscription_t * pSubscription = pOperation->pSubscription;
uint16_t operationTopicLength = 0;
AwsIotSubscriptionInfo_t subscriptionInfo = { 0 };
AwsIotJobsRequestInfo_t requestInfo = AWS_IOT_JOBS_REQUEST_INFO_INITIALIZER;
/* Do nothing if this Jobs operation has persistent subscriptions. */
if( pSubscription->operationReferences[ type ] != AWS_IOT_PERSISTENT_SUBSCRIPTION )
{
/* Decrement the number of subscription references for this operation.
* Ensure that it's positive. */
( pSubscription->operationReferences[ type ] )--;
AwsIotJobs_Assert( pSubscription->operationReferences[ type ] >= 0 );
/* Check if the number of references has reached 0. */
if( pSubscription->operationReferences[ type ] == 0 )
{
IotLogDebug( "Reference count for %.*s %s is 0. Unsubscribing.",
pSubscription->thingNameLength,
pSubscription->pThingName,
_pAwsIotJobsOperationNames[ type ] );
/* Subscription must have a topic buffer. */
AwsIotJobs_Assert( pSubscription->pTopicBuffer != NULL );
/* Set the parameters needed to generate a Jobs topic. */
requestInfo.pThingName = pSubscription->pThingName;
requestInfo.thingNameLength = pSubscription->thingNameLength;
requestInfo.pJobId = pOperation->pJobId;
requestInfo.jobIdLength = pOperation->jobIdLength;
/* Generate the prefix of the Jobs topic. This function will not
* fail when given a buffer. */
( void ) _AwsIotJobs_GenerateJobsTopic( ( _jobsOperationType_t ) type,
&requestInfo,
&( pSubscription->pTopicBuffer ),
&operationTopicLength );
/* Set the parameters needed to remove subscriptions. */
subscriptionInfo.mqttConnection = pOperation->mqttConnection;
subscriptionInfo.timeout = _AwsIotJobsMqttTimeoutMs;
subscriptionInfo.pTopicFilterBase = pTopicBuffer;
subscriptionInfo.topicFilterBaseLength = operationTopicLength;
( void ) AwsIot_ModifySubscriptions( IotMqtt_UnsubscribeSync,
&subscriptionInfo );
}
/* Check if this subscription should be deleted. */
_AwsIotJobs_RemoveSubscription( pSubscription,
pRemovedSubscription );
}
else
{
IotLogDebug( "Jobs %s for %.*s has persistent subscriptions. Reference "
"count will not be decremented.",
_pAwsIotJobsOperationNames[ type ],
pSubscription->thingNameLength,
pSubscription->pThingName );
}
}
/*-----------------------------------------------------------*/
AwsIotJobsError_t AwsIotJobs_RemovePersistentSubscriptions( const AwsIotJobsRequestInfo_t * pRequestInfo,
uint32_t flags )
{
IOT_FUNCTION_ENTRY( AwsIotJobsError_t, AWS_IOT_JOBS_SUCCESS );
int32_t i = 0;
uint16_t operationTopicLength = 0;
IotMqttError_t unsubscribeStatus = IOT_MQTT_STATUS_PENDING;
AwsIotSubscriptionInfo_t subscriptionInfo = { 0 };
_jobsSubscription_t * pSubscription = NULL;
IotLink_t * pSubscriptionLink = NULL;
AwsIotThingName_t thingName = { 0 };
thingName.pThingName = pRequestInfo->pThingName;
thingName.thingNameLength = pRequestInfo->thingNameLength;
IotLogInfo( "Removing persistent subscriptions for %.*s.",
pRequestInfo->thingNameLength,
pRequestInfo->pThingName );
/* Check parameters. */
if( pRequestInfo->mqttConnection == IOT_MQTT_CONNECTION_INITIALIZER )
{
IotLogError( "MQTT connection is not initialized." );
IOT_SET_AND_GOTO_CLEANUP( AWS_IOT_JOBS_BAD_PARAMETER );
}
if( AwsIot_ValidateThingName( pRequestInfo->pThingName,
pRequestInfo->thingNameLength ) == false )
{
IotLogError( "Thing Name is not valid." );
IOT_SET_AND_GOTO_CLEANUP( AWS_IOT_JOBS_BAD_PARAMETER );
}
if( ( ( flags & AWS_IOT_JOBS_FLAG_REMOVE_DESCRIBE_SUBSCRIPTIONS ) != 0 ) ||
( ( flags & AWS_IOT_JOBS_FLAG_REMOVE_UPDATE_SUBSCRIPTIONS ) != 0 ) )
{
if( ( pRequestInfo->pJobId == NULL ) || ( pRequestInfo->jobIdLength == 0 ) )
{
IotLogError( "Job ID must be set." );
IOT_SET_AND_GOTO_CLEANUP( AWS_IOT_JOBS_BAD_PARAMETER );
}
if( pRequestInfo->jobIdLength > JOBS_MAX_ID_LENGTH )
{
IotLogError( "Job ID cannot be longer than %d.",
JOBS_MAX_ID_LENGTH );
IOT_SET_AND_GOTO_CLEANUP( AWS_IOT_JOBS_BAD_PARAMETER );
}
}
IotMutex_Lock( &( _AwsIotJobsSubscriptionsMutex ) );
/* Search the list for an existing subscription for Thing Name. */
pSubscriptionLink = IotListDouble_FindFirstMatch( &( _AwsIotJobsSubscriptions ),
NULL,
_jobsSubscription_match,
&thingName );
if( pSubscriptionLink != NULL )
{
IotLogDebug( "Found subscription object for %.*s. Checking for persistent "
"subscriptions to remove.",
pRequestInfo->thingNameLength,
pRequestInfo->pThingName );
pSubscription = IotLink_Container( _jobsSubscription_t, pSubscriptionLink, link );
for( i = 0; i < JOBS_OPERATION_COUNT; i++ )
{
if( ( flags & ( 0x1UL << i ) ) != 0 )
{
IotLogDebug( "Removing %.*s %s persistent subscriptions.",
pRequestInfo->thingNameLength,
pRequestInfo->pThingName,
_pAwsIotJobsOperationNames[ i ] );
/* Subscription must have a topic buffer. */
AwsIotJobs_Assert( pSubscription->pTopicBuffer != NULL );
if( pSubscription->operationReferences[ i ] == AWS_IOT_PERSISTENT_SUBSCRIPTION )
{
/* Generate the prefix of the Jobs topic. This function will not
* fail when given a buffer. */
( void ) _AwsIotJobs_GenerateJobsTopic( ( _jobsOperationType_t ) i,
pRequestInfo,
&( pSubscription->pTopicBuffer ),
&operationTopicLength );
/* Set the parameters needed to remove subscriptions. */
subscriptionInfo.mqttConnection = pRequestInfo->mqttConnection;
subscriptionInfo.timeout = _AwsIotJobsMqttTimeoutMs;
subscriptionInfo.pTopicFilterBase = pSubscription->pTopicBuffer;
subscriptionInfo.topicFilterBaseLength = operationTopicLength;
unsubscribeStatus = AwsIot_ModifySubscriptions( IotMqtt_UnsubscribeSync,
&subscriptionInfo );
/* Convert MQTT return code to Shadow return code. */
switch( unsubscribeStatus )
{
case IOT_MQTT_SUCCESS:
status = AWS_IOT_JOBS_SUCCESS;
break;
case IOT_MQTT_NO_MEMORY:
status = AWS_IOT_JOBS_NO_MEMORY;
break;
default:
status = AWS_IOT_JOBS_MQTT_ERROR;
break;
}
if( status != AWS_IOT_JOBS_SUCCESS )
{
break;
}
/* Clear the persistent subscriptions flag and check if the
* subscription can be removed. */
pSubscription->operationReferences[ i ] = 0;
_AwsIotJobs_RemoveSubscription( pSubscription, NULL );
}
else
{
IotLogDebug( "%.*s %s does not have persistent subscriptions.",
pRequestInfo->thingNameLength,
pRequestInfo->pThingName,
_pAwsIotJobsOperationNames[ i ] );
}
}
}
}
else
{
IotLogWarn( "No subscription object found for %.*s",
pRequestInfo->thingNameLength,
pRequestInfo->pThingName );
}
IotMutex_Unlock( &( _AwsIotJobsSubscriptionsMutex ) );
IOT_FUNCTION_EXIT_NO_CLEANUP();
}
/*-----------------------------------------------------------*/