/*
 * IoT MQTT V2.1.0
 * Copyright (C) 2018 Amazon.com, Inc. or its affiliates.  All Rights Reserved.
 *
 * Permission is hereby granted, free of charge, to any person obtaining a copy of
 * this software and associated documentation files (the "Software"), to deal in
 * the Software without restriction, including without limitation the rights to
 * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
 * the Software, and to permit persons to whom the Software is furnished to do so,
 * subject to the following conditions:
 *
 * The above copyright notice and this permission notice shall be included in all
 * copies or substantial portions of the Software.
 *
 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
 * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
 * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
 * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
 * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
 */

/**
 * @file iot_mqtt_operation.c
 * @brief Implements functions that process MQTT operations.
 */

/* The config header is always included first. */
#include "iot_config.h"

/* Standard includes. */
#include <string.h>

/* Error handling include. */
#include "iot_error.h"

/* MQTT internal include. */
#include "private/iot_mqtt_internal.h"

/* Platform layer includes. */
#include "platform/iot_clock.h"
#include "platform/iot_threads.h"

/* Atomics include. */
#include "iot_atomic.h"

/**
 * @cond DOXYGEN_IGNORE
 * Doxygen should ignore this section.
 *
 * Declaration of local MQTT serializer override selectors
 */
#if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1
    _SERIALIZER_OVERRIDE_SELECTOR( IotMqttPublishSetDup_t,
                                   _getMqttPublishSetDupFunc,
                                   _IotMqtt_PublishSetDup,
                                   serialize.publishSetDup )
    _SERIALIZER_OVERRIDE_SELECTOR( IotMqttFreePacket_t,
                                   _getMqttFreePacketFunc,
                                   _IotMqtt_FreePacket,
                                   freePacket )
#else  /* if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1 */
    #define _getMqttFreePacketFunc( pSerializer )       _IotMqtt_FreePacket
    #define _getMqttPublishSetDupFunc( pSerializer )    _IotMqtt_PublishSetDup
#endif /* if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1 */
/** @endcond */

/*-----------------------------------------------------------*/

/**
 * @brief First parameter to #_mqttOperation_match.
 */
typedef struct _operationMatchParam
{
    IotMqttOperationType_t type;        /**< @brief The type of operation to look for. */
    const uint16_t * pPacketIdentifier; /**< @brief The packet identifier associated with the operation.
                                         * Set to `NULL` to ignore packet identifier. */
} _operationMatchParam_t;

/*-----------------------------------------------------------*/

/**
 * @brief Match an MQTT operation by type and packet identifier.
 *
 * @param[in] pOperationLink Pointer to the link member of an #_mqttOperation_t.
 * @param[in] pMatch Pointer to an #_operationMatchParam_t.
 *
 * @return `true` if the operation matches the parameters in `pArgument`; `false`
 * otherwise.
 */
static bool _mqttOperation_match( const IotLink_t * pOperationLink,
                                  void * pMatch );

/**
 * @brief Check if an operation with retry has exceeded its retry limit.
 *
 * If a PUBLISH operation is available for retry, this function also sets any
 * necessary DUP flags.
 *
 * @param[in] pOperation The operation to check.
 *
 * @return `true` if the operation may be retried; `false` otherwise.
 */
static bool _checkRetryLimit( _mqttOperation_t * pOperation );

/**
 * @brief Schedule the next send of an operation with retry.
 *
 * @param[in] pOperation The operation to schedule.
 *
 * @return `true` if the reschedule succeeded; `false` otherwise.
 */
static bool _scheduleNextRetry( _mqttOperation_t * pOperation );

/*-----------------------------------------------------------*/

static bool _mqttOperation_match( const IotLink_t * pOperationLink,
                                  void * pMatch )
{
    bool match = false;

    /* Because this function is called from a container function, the given link
     * must never be NULL. */
    IotMqtt_Assert( pOperationLink != NULL );

    _mqttOperation_t * pOperation = IotLink_Container( _mqttOperation_t,
                                                       pOperationLink,
                                                       link );
    _operationMatchParam_t * pParam = ( _operationMatchParam_t * ) pMatch;

    /* Check for matching operations. */
    if( pParam->type == pOperation->u.operation.type )
    {
        /* Check for matching packet identifiers. */
        if( pParam->pPacketIdentifier == NULL )
        {
            match = true;
        }
        else
        {
            match = ( *( pParam->pPacketIdentifier ) == pOperation->u.operation.packetIdentifier );
        }
    }
    else
    {
        EMPTY_ELSE_MARKER;
    }

    return match;
}

/*-----------------------------------------------------------*/

static bool _checkRetryLimit( _mqttOperation_t * pOperation )
{
    _mqttConnection_t * pMqttConnection = pOperation->pMqttConnection;
    bool status = true, setDup = false;

    /* Only PUBLISH may be retried. */
    IotMqtt_Assert( pOperation->u.operation.type == IOT_MQTT_PUBLISH_TO_SERVER );

    /* Check if the retry limit is exhausted. */
    if( pOperation->u.operation.periodic.retry.count > pOperation->u.operation.periodic.retry.limit )
    {
        /* The retry count may be at most one more than the retry limit, which
         * accounts for the final check for a PUBACK. */
        IotMqtt_Assert( pOperation->u.operation.periodic.retry.count ==
                        pOperation->u.operation.periodic.retry.limit + 1 );

        IotLogDebug( "(MQTT connection %p, PUBLISH operation %p) No response received after %lu retries.",
                     pMqttConnection,
                     pOperation,
                     pOperation->u.operation.periodic.retry.limit );

        status = false;
    }
    else
    {
        if( pOperation->u.operation.periodic.retry.count == 1 )
        {
            /* The DUP flag should always be set on the first retry. */
            setDup = true;
        }
        else if( pMqttConnection->awsIotMqttMode == true )
        {
            /* In AWS IoT MQTT mode, the DUP flag (really a change to the packet
             * identifier) must be reset on every retry. */
            setDup = true;
        }
        else
        {
            EMPTY_ELSE_MARKER;
        }

        if( setDup == true )
        {
            /* In AWS IoT MQTT mode, the references mutex must be locked to
             * prevent the packet identifier from being read while it is being
             * changed. */
            if( pMqttConnection->awsIotMqttMode == true )
            {
                IotMutex_Lock( &( pMqttConnection->referencesMutex ) );
            }
            else
            {
                EMPTY_ELSE_MARKER;
            }

            /* Set the DUP flag */
            _getMqttPublishSetDupFunc( pMqttConnection->pSerializer )( pOperation->u.operation.pMqttPacket,
                                                                       pOperation->u.operation.pPacketIdentifierHigh,
                                                                       &( pOperation->u.operation.packetIdentifier ) );

            if( pMqttConnection->awsIotMqttMode == true )
            {
                IotMutex_Unlock( &( pMqttConnection->referencesMutex ) );
            }
            else
            {
                EMPTY_ELSE_MARKER;
            }
        }
        else
        {
            EMPTY_ELSE_MARKER;
        }
    }

    return status;
}

/*-----------------------------------------------------------*/

static bool _scheduleNextRetry( _mqttOperation_t * pOperation )
{
    bool firstRetry = false;
    uint32_t scheduleDelay = 0;
    IotMqttError_t status = IOT_MQTT_STATUS_PENDING;
    _mqttConnection_t * pMqttConnection = pOperation->pMqttConnection;

    /* This function should never be called with retry count greater than
     * retry limit. */
    IotMqtt_Assert( pOperation->u.operation.periodic.retry.count <=
                    pOperation->u.operation.periodic.retry.limit );

    /* Increment the retry count. */
    ( pOperation->u.operation.periodic.retry.count )++;

    /* Check for a response shortly for the final retry. Otherwise, calculate the
     * next retry period. */
    if( pOperation->u.operation.periodic.retry.count >
        pOperation->u.operation.periodic.retry.limit )
    {
        scheduleDelay = IOT_MQTT_RESPONSE_WAIT_MS;

        IotLogDebug( "(MQTT connection %p, PUBLISH operation %p) Final retry was sent. Will check "
                     "for response in %d ms.",
                     pMqttConnection,
                     pOperation,
                     IOT_MQTT_RESPONSE_WAIT_MS );
    }
    else
    {
        scheduleDelay = pOperation->u.operation.periodic.retry.nextPeriodMs;

        /* Double the retry period, subject to a ceiling value. */
        pOperation->u.operation.periodic.retry.nextPeriodMs *= 2;

        if( pOperation->u.operation.periodic.retry.nextPeriodMs > IOT_MQTT_RETRY_MS_CEILING )
        {
            pOperation->u.operation.periodic.retry.nextPeriodMs = IOT_MQTT_RETRY_MS_CEILING;
        }
        else
        {
            EMPTY_ELSE_MARKER;
        }

        IotLogDebug( "(MQTT connection %p, PUBLISH operation %p) Scheduling retry %lu of %lu in %lu ms.",
                     pMqttConnection,
                     pOperation,
                     ( unsigned long ) pOperation->u.operation.periodic.retry.count,
                     ( unsigned long ) pOperation->u.operation.periodic.retry.limit,
                     ( unsigned long ) scheduleDelay );

        /* Check if this is the first retry. */
        firstRetry = ( pOperation->u.operation.periodic.retry.count == 1 );

        /* On the first retry, the PUBLISH will be moved from the pending processing
         * list to the pending responses list. Lock the connection references mutex
         * to manipulate the lists. */
        if( firstRetry == true )
        {
            IotMutex_Lock( &( pMqttConnection->referencesMutex ) );
        }
        else
        {
            EMPTY_ELSE_MARKER;
        }
    }

    /* Reschedule the PUBLISH for another send. */
    status = _IotMqtt_ScheduleOperation( pOperation,
                                         _IotMqtt_ProcessSend,
                                         scheduleDelay );

    /* Check for successful reschedule. */
    if( status == IOT_MQTT_SUCCESS )
    {
        /* Move a successfully rescheduled PUBLISH from the pending processing
         * list to the pending responses list on the first retry. */
        if( firstRetry == true )
        {
            if( IotLink_IsLinked( &( pOperation->link ) ) == true )
            {
                IotListDouble_Remove( &( pOperation->link ) );
            }

            IotListDouble_InsertHead( &( pMqttConnection->pendingResponse ),
                                      &( pOperation->link ) );
        }
        else
        {
            EMPTY_ELSE_MARKER;
        }
    }
    else
    {
        EMPTY_ELSE_MARKER;
    }

    /* The references mutex only needs to be unlocked on the first retry, since
     * only the first retry manipulates the connection lists. */
    if( firstRetry == true )
    {
        IotMutex_Unlock( &( pMqttConnection->referencesMutex ) );
    }
    else
    {
        EMPTY_ELSE_MARKER;
    }

    return( status == IOT_MQTT_SUCCESS );
}

/*-----------------------------------------------------------*/

IotMqttError_t _IotMqtt_CreateOperation( _mqttConnection_t * pMqttConnection,
                                         uint32_t flags,
                                         const IotMqttCallbackInfo_t * pCallbackInfo,
                                         _mqttOperation_t ** pNewOperation )
{
    IOT_FUNCTION_ENTRY( IotMqttError_t, IOT_MQTT_SUCCESS );
    bool decrementOnError = false;
    _mqttOperation_t * pOperation = NULL;
    bool waitable = ( ( flags & IOT_MQTT_FLAG_WAITABLE ) == IOT_MQTT_FLAG_WAITABLE );

    /* If the waitable flag is set, make sure that there's no callback. */
    if( waitable == true )
    {
        if( pCallbackInfo != NULL )
        {
            IotLogError( "Callback should not be set for a waitable operation." );

            IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_BAD_PARAMETER );
        }
        else
        {
            EMPTY_ELSE_MARKER;
        }
    }
    else
    {
        EMPTY_ELSE_MARKER;
    }

    IotLogDebug( "(MQTT connection %p) Creating new operation record.",
                 pMqttConnection );

    /* Increment the reference count for the MQTT connection when creating a new
     * operation. */
    if( _IotMqtt_IncrementConnectionReferences( pMqttConnection ) == false )
    {
        IotLogError( "(MQTT connection %p) New operation record cannot be created"
                     " for a closed connection",
                     pMqttConnection );

        IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_NETWORK_ERROR );
    }
    else
    {
        /* Reference count will need to be decremented on error. */
        decrementOnError = true;
    }

    /* Allocate memory for a new operation. */
    pOperation = IotMqtt_MallocOperation( sizeof( _mqttOperation_t ) );

    if( pOperation == NULL )
    {
        IotLogError( "(MQTT connection %p) Failed to allocate memory for new "
                     "operation record.",
                     pMqttConnection );

        IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_NO_MEMORY );
    }
    else
    {
        /* Clear the operation data. */
        ( void ) memset( pOperation, 0x00, sizeof( _mqttOperation_t ) );

        /* Initialize some members of the new operation. */
        pOperation->pMqttConnection = pMqttConnection;
        pOperation->u.operation.jobReference = 1;
        pOperation->u.operation.flags = flags;
        pOperation->u.operation.status = IOT_MQTT_STATUS_PENDING;
    }

    /* Check if the waitable flag is set. If it is, create a semaphore to
     * wait on. */
    if( waitable == true )
    {
        /* Create a semaphore to wait on for a waitable operation. */
        if( IotSemaphore_Create( &( pOperation->u.operation.notify.waitSemaphore ), 0, 1 ) == false )
        {
            IotLogError( "(MQTT connection %p) Failed to create semaphore for "
                         "waitable operation.",
                         pMqttConnection );

            IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_NO_MEMORY );
        }
        else
        {
            /* A waitable operation is created with an additional reference for the
             * Wait function. */
            ( pOperation->u.operation.jobReference )++;
        }
    }
    else
    {
        /* If the waitable flag isn't set but a callback is, copy the callback
         * information. */
        if( pCallbackInfo != NULL )
        {
            pOperation->u.operation.notify.callback = *pCallbackInfo;
        }
        else
        {
            EMPTY_ELSE_MARKER;
        }
    }

    /* Add this operation to the MQTT connection's operation list. */
    IotMutex_Lock( &( pMqttConnection->referencesMutex ) );
    IotListDouble_InsertHead( &( pMqttConnection->pendingProcessing ),
                              &( pOperation->link ) );
    IotMutex_Unlock( &( pMqttConnection->referencesMutex ) );

    /* Set the output parameter. */
    *pNewOperation = pOperation;

    /* Clean up operation and decrement reference count if this function failed. */
    IOT_FUNCTION_CLEANUP_BEGIN();

    if( status != IOT_MQTT_SUCCESS )
    {
        if( decrementOnError == true )
        {
            _IotMqtt_DecrementConnectionReferences( pMqttConnection );
        }
        else
        {
            EMPTY_ELSE_MARKER;
        }

        if( pOperation != NULL )
        {
            IotMqtt_FreeOperation( pOperation );
        }
        else
        {
            EMPTY_ELSE_MARKER;
        }
    }
    else
    {
        EMPTY_ELSE_MARKER;
    }

    IOT_FUNCTION_CLEANUP_END();
}

/*-----------------------------------------------------------*/

bool _IotMqtt_DecrementOperationReferences( _mqttOperation_t * pOperation,
                                            bool cancelJob )
{
    bool destroyOperation = false;
    IotTaskPoolError_t taskPoolStatus = IOT_TASKPOOL_SUCCESS;
    _mqttConnection_t * pMqttConnection = pOperation->pMqttConnection;

    /* Attempt to cancel the operation's job. */
    if( cancelJob == true )
    {
        taskPoolStatus = IotTaskPool_TryCancel( IOT_SYSTEM_TASKPOOL,
                                                pOperation->job,
                                                NULL );

        if( taskPoolStatus == IOT_TASKPOOL_SUCCESS )
        {
            IotLogDebug( "(MQTT connection %p, %s operation %p) Job canceled.",
                         pMqttConnection,
                         IotMqtt_OperationType( pOperation->u.operation.type ),
                         pOperation );
        }
        else
        {
            EMPTY_ELSE_MARKER;
        }
    }
    else
    {
        EMPTY_ELSE_MARKER;
    }

    /* Decrement job reference count. */
    if( taskPoolStatus == IOT_TASKPOOL_SUCCESS )
    {
        IotMutex_Lock( &( pMqttConnection->referencesMutex ) );
        pOperation->u.operation.jobReference--;

        IotLogDebug( "(MQTT connection %p, %s operation %p) Job reference changed"
                     " from %ld to %ld.",
                     pMqttConnection,
                     IotMqtt_OperationType( pOperation->u.operation.type ),
                     pOperation,
                     ( long ) ( pOperation->u.operation.jobReference + 1 ),
                     ( long ) ( pOperation->u.operation.jobReference ) );

        /* The job reference count must be 0 or 1 after the decrement. */
        IotMqtt_Assert( ( pOperation->u.operation.jobReference == 0 ) ||
                        ( pOperation->u.operation.jobReference == 1 ) );

        /* This operation may be destroyed if its reference count is 0. */
        if( pOperation->u.operation.jobReference == 0 )
        {
            destroyOperation = true;
        }
        else
        {
            EMPTY_ELSE_MARKER;
        }

        IotMutex_Unlock( &( pMqttConnection->referencesMutex ) );
    }
    else
    {
        EMPTY_ELSE_MARKER;
    }

    return destroyOperation;
}

/*-----------------------------------------------------------*/

void _IotMqtt_DestroyOperation( _mqttOperation_t * pOperation )
{
    _mqttConnection_t * pMqttConnection = pOperation->pMqttConnection;

    IotLogDebug( "(MQTT connection %p, %s operation %p) Destroying operation.",
                 pMqttConnection,
                 IotMqtt_OperationType( pOperation->u.operation.type ),
                 pOperation );

    /* The job reference count must be between 0 and 2. */
    IotMqtt_Assert( ( pOperation->u.operation.jobReference >= 0 ) &&
                    ( pOperation->u.operation.jobReference <= 2 ) );

    /* Jobs to be destroyed should be removed from the MQTT connection's
     * lists. */
    IotMutex_Lock( &( pMqttConnection->referencesMutex ) );

    if( IotLink_IsLinked( &( pOperation->link ) ) == true )
    {
        IotLogDebug( "(MQTT connection %p, %s operation %p) Removed operation from connection lists.",
                     pMqttConnection,
                     IotMqtt_OperationType( pOperation->u.operation.type ),
                     pOperation,
                     pMqttConnection );

        IotListDouble_Remove( &( pOperation->link ) );
    }
    else
    {
        IotLogDebug( "(MQTT connection %p, %s operation %p) Operation was not present in connection lists.",
                     pMqttConnection,
                     IotMqtt_OperationType( pOperation->u.operation.type ),
                     pOperation );
    }

    IotMutex_Unlock( &( pMqttConnection->referencesMutex ) );

    /* Free any allocated MQTT packet. */
    if( pOperation->u.operation.pMqttPacket != NULL )
    {
        _getMqttFreePacketFunc( pMqttConnection->pSerializer )( pOperation->u.operation.pMqttPacket );

        IotLogDebug( "(MQTT connection %p, %s operation %p) MQTT packet freed.",
                     pMqttConnection,
                     IotMqtt_OperationType( pOperation->u.operation.type ),
                     pOperation );
    }
    else
    {
        IotLogDebug( "(MQTT connection %p, %s operation %p) Operation has no allocated MQTT packet.",
                     pMqttConnection,
                     IotMqtt_OperationType( pOperation->u.operation.type ),
                     pOperation );
    }

    /* Check if a wait semaphore was created for this operation. */
    if( ( pOperation->u.operation.flags & IOT_MQTT_FLAG_WAITABLE ) == IOT_MQTT_FLAG_WAITABLE )
    {
        IotSemaphore_Destroy( &( pOperation->u.operation.notify.waitSemaphore ) );

        IotLogDebug( "(MQTT connection %p, %s operation %p) Wait semaphore destroyed.",
                     pMqttConnection,
                     IotMqtt_OperationType( pOperation->u.operation.type ),
                     pOperation );
    }
    else
    {
        EMPTY_ELSE_MARKER;
    }

    IotLogDebug( "(MQTT connection %p, %s operation %p) Operation record destroyed.",
                 pMqttConnection,
                 IotMqtt_OperationType( pOperation->u.operation.type ),
                 pOperation );

    /* Free the memory used to hold operation data. */
    IotMqtt_FreeOperation( pOperation );

    /* Decrement the MQTT connection's reference count after destroying an
     * operation. */
    _IotMqtt_DecrementConnectionReferences( pMqttConnection );
}

/*-----------------------------------------------------------*/

void _IotMqtt_ProcessKeepAlive( IotTaskPool_t pTaskPool,
                                IotTaskPoolJob_t pKeepAliveJob,
                                void * pContext )
{
    bool status = true;
    uint32_t swapStatus = 0;
    IotTaskPoolError_t taskPoolStatus = IOT_TASKPOOL_SUCCESS;
    size_t bytesSent = 0;

    /* Swap status is not checked when asserts are disabled. */
    ( void ) swapStatus;

    /* Retrieve the MQTT connection from the context. */
    _mqttConnection_t * pMqttConnection = ( _mqttConnection_t * ) pContext;
    _mqttOperation_t * pPingreqOperation = &( pMqttConnection->pingreq );

    /* Check parameters. */
    IotMqtt_Assert( pKeepAliveJob == pPingreqOperation->job );

    /* Check that keep-alive interval is valid. The MQTT spec states its maximum
     * value is 65,535 seconds. */
    IotMqtt_Assert( pPingreqOperation->u.operation.periodic.ping.keepAliveMs <= 65535000 );

    /* Only two values are valid for the next keep alive job delay. */
    IotMqtt_Assert( ( pPingreqOperation->u.operation.periodic.ping.nextPeriodMs ==
                      pPingreqOperation->u.operation.periodic.ping.keepAliveMs ) ||
                    ( pPingreqOperation->u.operation.periodic.ping.nextPeriodMs
                      == IOT_MQTT_RESPONSE_WAIT_MS ) );

    IotLogDebug( "(MQTT connection %p) Keep-alive job started.", pMqttConnection );

    /* Determine whether to send a PINGREQ or check for PINGRESP. */
    if( pPingreqOperation->u.operation.periodic.ping.nextPeriodMs ==
        pPingreqOperation->u.operation.periodic.ping.keepAliveMs )
    {
        IotLogDebug( "(MQTT connection %p) Sending PINGREQ.", pMqttConnection );

        /* Because PINGREQ may be used to keep the MQTT connection alive, it is
         * more important than other operations. Bypass the queue of jobs for
         * operations by directly sending the PINGREQ in this job. */
        bytesSent = pMqttConnection->pNetworkInterface->send( pMqttConnection->pNetworkConnection,
                                                              pPingreqOperation->u.operation.pMqttPacket,
                                                              pPingreqOperation->u.operation.packetSize );

        if( bytesSent != pPingreqOperation->u.operation.packetSize )
        {
            IotLogError( "(MQTT connection %p) Failed to send PINGREQ.", pMqttConnection );
            status = false;
        }
        else
        {
            /* Assume the keep-alive will fail. The network receive callback will
             * clear the failure flag upon receiving a PINGRESP. */
            swapStatus = Atomic_CompareAndSwap_u32( &( pPingreqOperation->u.operation.periodic.ping.failure ),
                                                    1,
                                                    0 );
            IotMqtt_Assert( swapStatus == 1 );

            /* Schedule a check for PINGRESP. */
            pPingreqOperation->u.operation.periodic.ping.nextPeriodMs = IOT_MQTT_RESPONSE_WAIT_MS;

            IotLogDebug( "(MQTT connection %p) PINGREQ sent. Scheduling check for PINGRESP in %d ms.",
                         pMqttConnection,
                         IOT_MQTT_RESPONSE_WAIT_MS );
        }
    }
    else
    {
        IotLogDebug( "(MQTT connection %p) Checking for PINGRESP.", pMqttConnection );

        if( Atomic_Add_u32( &( pPingreqOperation->u.operation.periodic.ping.failure ), 0 ) == 0 )
        {
            IotLogDebug( "(MQTT connection %p) PINGRESP was received.", pMqttConnection );

            /* PINGRESP was received. Schedule the next PINGREQ transmission. */
            pPingreqOperation->u.operation.periodic.ping.nextPeriodMs =
                pPingreqOperation->u.operation.periodic.ping.keepAliveMs;
        }
        else
        {
            IotLogError( "(MQTT connection %p) Failed to receive PINGRESP within %d ms.",
                         pMqttConnection,
                         IOT_MQTT_RESPONSE_WAIT_MS );

            /* The network receive callback did not clear the failure flag. */
            status = false;
        }
    }

    /* When a PINGREQ is successfully sent, reschedule this job to check for a
     * response shortly. */
    if( status == true )
    {
        IotMutex_Lock( &( pMqttConnection->referencesMutex ) );

        /* Re-create the keep-alive job for rescheduling. This should never fail. */
        taskPoolStatus = IotTaskPool_CreateJob( _IotMqtt_ProcessKeepAlive,
                                                pContext,
                                                IotTaskPool_GetJobStorageFromHandle( pKeepAliveJob ),
                                                &pKeepAliveJob );
        IotMqtt_Assert( taskPoolStatus == IOT_TASKPOOL_SUCCESS );

        taskPoolStatus = IotTaskPool_ScheduleDeferred( pTaskPool,
                                                       pKeepAliveJob,
                                                       pPingreqOperation->u.operation.periodic.ping.nextPeriodMs );

        if( taskPoolStatus == IOT_TASKPOOL_SUCCESS )
        {
            IotLogDebug( "(MQTT connection %p) Next keep-alive job in %lu ms.",
                         pMqttConnection,
                         ( unsigned long ) pPingreqOperation->u.operation.periodic.ping.nextPeriodMs );
        }
        else
        {
            IotLogError( "(MQTT connection %p) Failed to reschedule keep-alive job, error %s.",
                         pMqttConnection,
                         IotTaskPool_strerror( taskPoolStatus ) );

            status = false;
        }

        IotMutex_Unlock( &( pMqttConnection->referencesMutex ) );
    }
    else
    {
        EMPTY_ELSE_MARKER;
    }

    /* Close the connection on failures. */
    if( status == false )
    {
        _IotMqtt_CloseNetworkConnection( IOT_MQTT_KEEP_ALIVE_TIMEOUT,
                                         pMqttConnection );

        /* Keep-alive has failed and will no longer use this MQTT connection. */
        _IotMqtt_DecrementConnectionReferences( pMqttConnection );
    }
    else
    {
        EMPTY_ELSE_MARKER;
    }
}

/*-----------------------------------------------------------*/

void _IotMqtt_ProcessIncomingPublish( IotTaskPool_t pTaskPool,
                                      IotTaskPoolJob_t pPublishJob,
                                      void * pContext )
{
    _mqttOperation_t * pOperation = pContext;
    IotMqttCallbackParam_t callbackParam = { .mqttConnection = NULL };

    /* Check parameters. The task pool and job parameter is not used when asserts
     * are disabled. */
    ( void ) pTaskPool;
    ( void ) pPublishJob;
    IotMqtt_Assert( pOperation->incomingPublish == true );
    IotMqtt_Assert( pPublishJob == pOperation->job );

    /* Remove the operation from the pending processing list. */
    IotMutex_Lock( &( pOperation->pMqttConnection->referencesMutex ) );

    if( IotLink_IsLinked( &( pOperation->link ) ) == true )
    {
        IotListDouble_Remove( &( pOperation->link ) );
    }
    else
    {
        /* This operation may have already been removed by cleanup of pending
         * operations (called from Disconnect). In that case, do nothing here. */
        EMPTY_ELSE_MARKER;
    }

    IotMutex_Unlock( &( pOperation->pMqttConnection->referencesMutex ) );

    /* Process the current PUBLISH. */
    callbackParam.u.message.info = pOperation->u.publish.publishInfo;

    _IotMqtt_InvokeSubscriptionCallback( pOperation->pMqttConnection,
                                         &callbackParam );

    /* Free buffers associated with the current PUBLISH message. */
    IotMqtt_Assert( pOperation->u.publish.pReceivedData != NULL );
    IotMqtt_FreeMessage( ( void * ) pOperation->u.publish.pReceivedData );

    /* Free the incoming PUBLISH operation. */
    IotMqtt_FreeOperation( pOperation );
}

/*-----------------------------------------------------------*/

void _IotMqtt_ProcessSend( IotTaskPool_t pTaskPool,
                           IotTaskPoolJob_t pSendJob,
                           void * pContext )
{
    size_t bytesSent = 0;
    bool destroyOperation = false, waitable = false, networkPending = false;
    _mqttOperation_t * pOperation = ( _mqttOperation_t * ) pContext;
    _mqttConnection_t * pMqttConnection = pOperation->pMqttConnection;

    /* Check parameters. The task pool and job parameter is not used when asserts
     * are disabled. */
    ( void ) pTaskPool;
    ( void ) pSendJob;
    IotMqtt_Assert( pSendJob == pOperation->job );

    /* The given operation must have an allocated packet and be waiting for a status. */
    IotMqtt_Assert( pOperation->u.operation.pMqttPacket != NULL );
    IotMqtt_Assert( pOperation->u.operation.packetSize != 0 );
    IotMqtt_Assert( pOperation->u.operation.status == IOT_MQTT_STATUS_PENDING );

    /* Check if this operation is waitable. */
    waitable = ( pOperation->u.operation.flags & IOT_MQTT_FLAG_WAITABLE ) == IOT_MQTT_FLAG_WAITABLE;

    /* Check PUBLISH retry counts and limits. */
    if( pOperation->u.operation.periodic.retry.limit > 0 )
    {
        if( _checkRetryLimit( pOperation ) == false )
        {
            pOperation->u.operation.status = IOT_MQTT_RETRY_NO_RESPONSE;
        }
        else
        {
            EMPTY_ELSE_MARKER;
        }
    }
    else
    {
        EMPTY_ELSE_MARKER;
    }

    /* Send an operation that is waiting for a response. */
    if( pOperation->u.operation.status == IOT_MQTT_STATUS_PENDING )
    {
        IotLogDebug( "(MQTT connection %p, %s operation %p) Sending MQTT packet.",
                     pMqttConnection,
                     IotMqtt_OperationType( pOperation->u.operation.type ),
                     pOperation );

        /* Transmit the MQTT packet from the operation over the network. */
        bytesSent = pMqttConnection->pNetworkInterface->send( pMqttConnection->pNetworkConnection,
                                                              pOperation->u.operation.pMqttPacket,
                                                              pOperation->u.operation.packetSize );

        /* Check transmission status. */
        if( bytesSent != pOperation->u.operation.packetSize )
        {
            pOperation->u.operation.status = IOT_MQTT_NETWORK_ERROR;
        }
        else
        {
            /* DISCONNECT operations are considered successful upon successful
             * transmission. In addition, non-waitable operations with no callback
             * may also be considered successful. */
            if( pOperation->u.operation.type == IOT_MQTT_DISCONNECT )
            {
                /* DISCONNECT operations are always waitable. */
                IotMqtt_Assert( waitable == true );

                pOperation->u.operation.status = IOT_MQTT_SUCCESS;
            }
            else if( waitable == false )
            {
                if( pOperation->u.operation.notify.callback.function == NULL )
                {
                    pOperation->u.operation.status = IOT_MQTT_SUCCESS;
                }
                else
                {
                    EMPTY_ELSE_MARKER;
                }
            }
            else
            {
                EMPTY_ELSE_MARKER;
            }
        }
    }
    else
    {
        EMPTY_ELSE_MARKER;
    }

    /* Check if this operation requires further processing. */
    if( pOperation->u.operation.status == IOT_MQTT_STATUS_PENDING )
    {
        /* Check if this operation should be scheduled for retransmission. */
        if( pOperation->u.operation.periodic.retry.limit > 0 )
        {
            if( _scheduleNextRetry( pOperation ) == false )
            {
                pOperation->u.operation.status = IOT_MQTT_SCHEDULING_ERROR;
            }
            else
            {
                /* A successfully scheduled PUBLISH retry is awaiting a response
                 * from the network. */
                networkPending = true;
            }
        }
        else
        {
            /* Decrement reference count to signal completion of send job. Check
             * if the operation should be destroyed. */
            IotMutex_Lock( &( pMqttConnection->referencesMutex ) );

            if( waitable == true )
            {
                destroyOperation = _IotMqtt_DecrementOperationReferences( pOperation, false );
            }
            else
            {
                EMPTY_ELSE_MARKER;
            }

            /* If the operation should not be destroyed, transfer it from the
             * pending processing to the pending response list. */
            if( destroyOperation == false )
            {
                if( IotLink_IsLinked( &( pOperation->link ) ) == true )
                {
                    IotListDouble_Remove( &( pOperation->link ) );
                }

                IotListDouble_InsertHead( &( pMqttConnection->pendingResponse ),
                                          &( pOperation->link ) );

                /* This operation is now awaiting a response from the network. */
                networkPending = true;
            }
            else
            {
                EMPTY_ELSE_MARKER;
            }

            IotMutex_Unlock( &( pMqttConnection->referencesMutex ) );
        }
    }
    else
    {
        EMPTY_ELSE_MARKER;
    }

    /* Destroy the operation or notify of completion if necessary. */
    if( destroyOperation == true )
    {
        _IotMqtt_DestroyOperation( pOperation );
    }
    else
    {
        /* Do not check the operation status if a network response is pending,
         * since a network response could modify the status. */
        if( networkPending == false )
        {
            /* Notify of operation completion if this job set a status. */
            if( pOperation->u.operation.status != IOT_MQTT_STATUS_PENDING )
            {
                _IotMqtt_Notify( pOperation );
            }
            else
            {
                EMPTY_ELSE_MARKER;
            }
        }
        else
        {
            EMPTY_ELSE_MARKER;
        }
    }
}

/*-----------------------------------------------------------*/

void _IotMqtt_ProcessCompletedOperation( IotTaskPool_t pTaskPool,
                                         IotTaskPoolJob_t pOperationJob,
                                         void * pContext )
{
    _mqttOperation_t * pOperation = ( _mqttOperation_t * ) pContext;
    IotMqttCallbackParam_t callbackParam = { 0 };

    /* Check parameters. The task pool and job parameter is not used when asserts
     * are disabled. */
    ( void ) pTaskPool;
    ( void ) pOperationJob;
    IotMqtt_Assert( pOperationJob == pOperation->job );

    /* The operation's callback function and status must be set. */
    IotMqtt_Assert( pOperation->u.operation.notify.callback.function != NULL );
    IotMqtt_Assert( pOperation->u.operation.status != IOT_MQTT_STATUS_PENDING );

    callbackParam.mqttConnection = pOperation->pMqttConnection;
    callbackParam.u.operation.type = pOperation->u.operation.type;
    callbackParam.u.operation.reference = pOperation;
    callbackParam.u.operation.result = pOperation->u.operation.status;

    /* Invoke the user callback function. */
    pOperation->u.operation.notify.callback.function( pOperation->u.operation.notify.callback.pCallbackContext,
                                                      &callbackParam );

    /* Attempt to destroy the operation once the user callback returns. */
    if( _IotMqtt_DecrementOperationReferences( pOperation, false ) == true )
    {
        _IotMqtt_DestroyOperation( pOperation );
    }
    else
    {
        EMPTY_ELSE_MARKER;
    }
}

/*-----------------------------------------------------------*/

IotMqttError_t _IotMqtt_ScheduleOperation( _mqttOperation_t * pOperation,
                                           IotTaskPoolRoutine_t jobRoutine,
                                           uint32_t delay )
{
    IotMqttError_t status = IOT_MQTT_SUCCESS;
    IotTaskPoolError_t taskPoolStatus = IOT_TASKPOOL_SUCCESS;

    /* Check that job routine is valid. */
    IotMqtt_Assert( ( jobRoutine == _IotMqtt_ProcessSend ) ||
                    ( jobRoutine == _IotMqtt_ProcessCompletedOperation ) ||
                    ( jobRoutine == _IotMqtt_ProcessIncomingPublish ) );

    /* Creating a new job should never fail when parameters are valid. */
    taskPoolStatus = IotTaskPool_CreateJob( jobRoutine,
                                            pOperation,
                                            &( pOperation->jobStorage ),
                                            &( pOperation->job ) );
    IotMqtt_Assert( taskPoolStatus == IOT_TASKPOOL_SUCCESS );

    /* Schedule the new job with a delay. */
    taskPoolStatus = IotTaskPool_ScheduleDeferred( IOT_SYSTEM_TASKPOOL,
                                                   pOperation->job,
                                                   delay );

    if( taskPoolStatus != IOT_TASKPOOL_SUCCESS )
    {
        /* Scheduling a newly-created job should never be invalid or illegal. */
        IotMqtt_Assert( taskPoolStatus != IOT_TASKPOOL_BAD_PARAMETER );
        IotMqtt_Assert( taskPoolStatus != IOT_TASKPOOL_ILLEGAL_OPERATION );

        IotLogWarn( "(MQTT connection %p, %s operation %p) Failed to schedule operation job, error %s.",
                    pOperation->pMqttConnection,
                    IotMqtt_OperationType( pOperation->u.operation.type ),
                    pOperation,
                    IotTaskPool_strerror( taskPoolStatus ) );

        status = IOT_MQTT_SCHEDULING_ERROR;
    }
    else
    {
        EMPTY_ELSE_MARKER;
    }

    return status;
}

/*-----------------------------------------------------------*/

_mqttOperation_t * _IotMqtt_FindOperation( _mqttConnection_t * pMqttConnection,
                                           IotMqttOperationType_t type,
                                           const uint16_t * pPacketIdentifier )
{
    bool waitable = false;
    IotTaskPoolError_t taskPoolStatus = IOT_TASKPOOL_SUCCESS;
    _mqttOperation_t * pResult = NULL;
    IotLink_t * pResultLink = NULL;
    _operationMatchParam_t operationMatchParams = { 0 };

    /* Set the members of the search parameter. */
    operationMatchParams.type = type;
    operationMatchParams.pPacketIdentifier = pPacketIdentifier;

    if( pPacketIdentifier != NULL )
    {
        IotLogDebug( "(MQTT connection %p) Searching for operation %s pending response "
                     "with packet identifier %hu.",
                     pMqttConnection,
                     IotMqtt_OperationType( type ),
                     *pPacketIdentifier );
    }
    else
    {
        IotLogDebug( "(MQTT connection %p) Searching for operation %s pending response.",
                     pMqttConnection,
                     IotMqtt_OperationType( type ) );
    }

    /* Find and remove the first matching element in the list. */
    IotMutex_Lock( &( pMqttConnection->referencesMutex ) );
    pResultLink = IotListDouble_FindFirstMatch( &( pMqttConnection->pendingResponse ),
                                                NULL,
                                                _mqttOperation_match,
                                                &operationMatchParams );

    /* Check if a match was found. */
    if( pResultLink != NULL )
    {
        /* Get operation pointer and check if it is waitable. */
        pResult = IotLink_Container( _mqttOperation_t, pResultLink, link );
        waitable = ( pResult->u.operation.flags & IOT_MQTT_FLAG_WAITABLE ) == IOT_MQTT_FLAG_WAITABLE;

        /* Check if the matched operation is a PUBLISH with retry. If it is, cancel
         * the retry job. */
        if( pResult->u.operation.periodic.retry.limit > 0 )
        {
            taskPoolStatus = IotTaskPool_TryCancel( IOT_SYSTEM_TASKPOOL,
                                                    pResult->job,
                                                    NULL );

            /* If the retry job could not be canceled, then it is currently
             * executing. Ignore the operation. */
            if( taskPoolStatus != IOT_TASKPOOL_SUCCESS )
            {
                pResult = NULL;
            }
            else
            {
                /* Check job reference counts. A waitable operation should have a
                 * count of 2; a non-waitable operation should have a count of 1. */
                IotMqtt_Assert( pResult->u.operation.jobReference == ( 1 + ( waitable == true ) ) );
            }
        }
        else
        {
            /* An operation with no retry in the pending responses list should
             * always have a job reference of 1. */
            IotMqtt_Assert( pResult->u.operation.jobReference == 1 );

            /* Increment job references of a waitable operation to prevent Wait from
             * destroying this operation if it times out. */
            if( waitable == true )
            {
                ( pResult->u.operation.jobReference )++;

                IotLogDebug( "(MQTT connection %p, %s operation %p) Job reference changed from %ld to %ld.",
                             pMqttConnection,
                             IotMqtt_OperationType( type ),
                             pResult,
                             ( long int ) ( pResult->u.operation.jobReference - 1 ),
                             ( long int ) ( pResult->u.operation.jobReference ) );
            }
        }
    }
    else
    {
        EMPTY_ELSE_MARKER;
    }

    if( pResult != NULL )
    {
        IotLogDebug( "(MQTT connection %p) Found operation %s.",
                     pMqttConnection,
                     IotMqtt_OperationType( type ) );

        /* Remove the matched operation from the list. */
        IotListDouble_Remove( &( pResult->link ) );
    }
    else
    {
        IotLogDebug( "(MQTT connection %p) Operation %s not found.",
                     pMqttConnection,
                     IotMqtt_OperationType( type ) );
    }

    IotMutex_Unlock( &( pMqttConnection->referencesMutex ) );

    return pResult;
}

/*-----------------------------------------------------------*/

void _IotMqtt_Notify( _mqttOperation_t * pOperation )
{
    IotMqttError_t status = IOT_MQTT_SCHEDULING_ERROR;
    _mqttConnection_t * pMqttConnection = pOperation->pMqttConnection;

    /* Check if operation is waitable. */
    bool waitable = ( pOperation->u.operation.flags & IOT_MQTT_FLAG_WAITABLE ) == IOT_MQTT_FLAG_WAITABLE;

    /* Remove any lingering subscriptions if a SUBSCRIBE failed. Rejected
     * subscriptions are removed by the deserializer, so not removed here. */
    if( pOperation->u.operation.type == IOT_MQTT_SUBSCRIBE )
    {
        switch( pOperation->u.operation.status )
        {
            case IOT_MQTT_SUCCESS:
                break;

            case IOT_MQTT_SERVER_REFUSED:
                break;

            default:
                _IotMqtt_RemoveSubscriptionByPacket( pOperation->pMqttConnection,
                                                     pOperation->u.operation.packetIdentifier,
                                                     -1 );
                break;
        }
    }
    else
    {
        EMPTY_ELSE_MARKER;
    }

    /* Schedule callback invocation for non-waitable operation. */
    if( waitable == false )
    {
        /* Non-waitable operation should have job reference of 1. */
        IotMqtt_Assert( pOperation->u.operation.jobReference == 1 );

        /* Schedule an invocation of the callback. */
        if( pOperation->u.operation.notify.callback.function != NULL )
        {
            IotMutex_Lock( &( pMqttConnection->referencesMutex ) );

            status = _IotMqtt_ScheduleOperation( pOperation,
                                                 _IotMqtt_ProcessCompletedOperation,
                                                 0 );

            if( status == IOT_MQTT_SUCCESS )
            {
                IotLogDebug( "(MQTT connection %p, %s operation %p) Callback scheduled.",
                             pOperation->pMqttConnection,
                             IotMqtt_OperationType( pOperation->u.operation.type ),
                             pOperation );

                /* Place the scheduled operation back in the list of operations pending
                 * processing. */
                if( IotLink_IsLinked( &( pOperation->link ) ) == true )
                {
                    IotListDouble_Remove( &( pOperation->link ) );
                }
                else
                {
                    EMPTY_ELSE_MARKER;
                }

                IotListDouble_InsertHead( &( pMqttConnection->pendingProcessing ),
                                          &( pOperation->link ) );
            }
            else
            {
                IotLogWarn( "(MQTT connection %p, %s operation %p) Failed to schedule callback.",
                            pOperation->pMqttConnection,
                            IotMqtt_OperationType( pOperation->u.operation.type ),
                            pOperation );
            }

            IotMutex_Unlock( &( pMqttConnection->referencesMutex ) );
        }
        else
        {
            EMPTY_ELSE_MARKER;
        }
    }
    else
    {
        EMPTY_ELSE_MARKER;
    }

    /* Operations that weren't scheduled may be destroyed. */
    if( status == IOT_MQTT_SCHEDULING_ERROR )
    {
        /* Decrement reference count of operations not scheduled. */
        if( _IotMqtt_DecrementOperationReferences( pOperation, false ) == true )
        {
            _IotMqtt_DestroyOperation( pOperation );
        }
        else
        {
            /* Post to a waitable operation's semaphore. */
            if( waitable == true )
            {
                IotLogDebug( "(MQTT connection %p, %s operation %p) Waitable operation "
                             "notified of completion.",
                             pOperation->pMqttConnection,
                             IotMqtt_OperationType( pOperation->u.operation.type ),
                             pOperation );

                IotSemaphore_Post( &( pOperation->u.operation.notify.waitSemaphore ) );
            }
            else
            {
                EMPTY_ELSE_MARKER;
            }
        }
    }
    else
    {
        IotMqtt_Assert( status == IOT_MQTT_SUCCESS );
    }
}

/*-----------------------------------------------------------*/
