| /* |
| * Copyright (c) 2018 Nordic Semiconductor ASA |
| * |
| * SPDX-License-Identifier: Apache-2.0 |
| */ |
| |
| /** @file mqtt.c |
| * |
| * @brief MQTT Client API Implementation. |
| */ |
| |
| #include <logging/log.h> |
| LOG_MODULE_REGISTER(net_mqtt, CONFIG_MQTT_LOG_LEVEL); |
| |
| #include <net/mqtt.h> |
| |
| #include "mqtt_transport.h" |
| #include "mqtt_internal.h" |
| #include "mqtt_os.h" |
| |
| static void client_reset(struct mqtt_client *client) |
| { |
| MQTT_STATE_INIT(client); |
| |
| client->internal.last_activity = 0U; |
| client->internal.rx_buf_datalen = 0U; |
| client->internal.remaining_payload = 0U; |
| } |
| |
| /** @brief Initialize tx buffer. */ |
| static void tx_buf_init(struct mqtt_client *client, struct buf_ctx *buf) |
| { |
| memset(client->tx_buf, 0, client->tx_buf_size); |
| buf->cur = client->tx_buf; |
| buf->end = client->tx_buf + client->tx_buf_size; |
| } |
| |
| /**@brief Notifies disconnection event to the application. |
| * |
| * @param[in] client Identifies the client for which the procedure is requested. |
| * @param[in] result Reason for disconnection. |
| */ |
| static void disconnect_event_notify(struct mqtt_client *client, int result) |
| { |
| struct mqtt_evt evt; |
| |
| /* Determine appropriate event to generate. */ |
| if (MQTT_HAS_STATE(client, MQTT_STATE_CONNECTED)) { |
| evt.type = MQTT_EVT_DISCONNECT; |
| evt.result = result; |
| } else { |
| evt.type = MQTT_EVT_CONNACK; |
| evt.result = -ECONNREFUSED; |
| } |
| |
| /* Notify application. */ |
| event_notify(client, &evt); |
| |
| /* Reset internal state. */ |
| client_reset(client); |
| } |
| |
| void event_notify(struct mqtt_client *client, const struct mqtt_evt *evt) |
| { |
| if (client->evt_cb != NULL) { |
| mqtt_mutex_unlock(client); |
| |
| client->evt_cb(client, evt); |
| |
| mqtt_mutex_lock(client); |
| } |
| } |
| |
| static void client_disconnect(struct mqtt_client *client, int result) |
| { |
| int err_code; |
| |
| err_code = mqtt_transport_disconnect(client); |
| if (err_code < 0) { |
| MQTT_ERR("Failed to disconnect transport!"); |
| } |
| |
| disconnect_event_notify(client, result); |
| } |
| |
| static int client_connect(struct mqtt_client *client) |
| { |
| int err_code; |
| struct buf_ctx packet; |
| |
| err_code = mqtt_transport_connect(client); |
| if (err_code < 0) { |
| return err_code; |
| } |
| |
| tx_buf_init(client, &packet); |
| MQTT_SET_STATE(client, MQTT_STATE_TCP_CONNECTED); |
| |
| err_code = connect_request_encode(client, &packet); |
| if (err_code < 0) { |
| goto error; |
| } |
| |
| /* Send MQTT identification message to broker. */ |
| err_code = mqtt_transport_write(client, packet.cur, |
| packet.end - packet.cur); |
| if (err_code < 0) { |
| goto error; |
| } |
| |
| client->internal.last_activity = mqtt_sys_tick_in_ms_get(); |
| |
| /* Reset the unanswered ping count for a new connection */ |
| client->unacked_ping = 0; |
| |
| MQTT_TRC("Connect completed"); |
| |
| return 0; |
| |
| error: |
| client_disconnect(client, err_code); |
| return err_code; |
| } |
| |
| static int client_read(struct mqtt_client *client) |
| { |
| int err_code; |
| |
| if (client->internal.remaining_payload > 0) { |
| return -EBUSY; |
| } |
| |
| err_code = mqtt_handle_rx(client); |
| if (err_code < 0) { |
| client_disconnect(client, err_code); |
| } |
| |
| return err_code; |
| } |
| |
| static int client_write(struct mqtt_client *client, const u8_t *data, |
| u32_t datalen) |
| { |
| int err_code; |
| |
| MQTT_TRC("[%p]: Transport writing %d bytes.", client, datalen); |
| |
| err_code = mqtt_transport_write(client, data, datalen); |
| if (err_code < 0) { |
| MQTT_TRC("Transport write failed, err_code = %d, " |
| "closing connection", err_code); |
| client_disconnect(client, err_code); |
| return err_code; |
| } |
| |
| MQTT_TRC("[%p]: Transport write complete.", client); |
| client->internal.last_activity = mqtt_sys_tick_in_ms_get(); |
| |
| return 0; |
| } |
| |
| static int client_write_msg(struct mqtt_client *client, |
| const struct msghdr *message) |
| { |
| int err_code; |
| |
| MQTT_TRC("[%p]: Transport writing message.", client); |
| |
| err_code = mqtt_transport_write_msg(client, message); |
| if (err_code < 0) { |
| MQTT_TRC("Transport write failed, err_code = %d, " |
| "closing connection", err_code); |
| client_disconnect(client, err_code); |
| return err_code; |
| } |
| |
| MQTT_TRC("[%p]: Transport write complete.", client); |
| client->internal.last_activity = mqtt_sys_tick_in_ms_get(); |
| |
| return 0; |
| } |
| |
| void mqtt_client_init(struct mqtt_client *client) |
| { |
| NULL_PARAM_CHECK_VOID(client); |
| |
| memset(client, 0, sizeof(*client)); |
| |
| MQTT_STATE_INIT(client); |
| mqtt_mutex_init(client); |
| |
| client->protocol_version = MQTT_VERSION_3_1_1; |
| client->clean_session = 1U; |
| client->keepalive = MQTT_KEEPALIVE; |
| } |
| |
| #if defined(CONFIG_SOCKS) |
| int mqtt_client_set_proxy(struct mqtt_client *client, |
| struct sockaddr *proxy_addr, |
| socklen_t addrlen) |
| { |
| if (IS_ENABLED(CONFIG_SOCKS)) { |
| if (!client || !proxy_addr) { |
| return -EINVAL; |
| } |
| |
| client->transport.proxy.addrlen = addrlen; |
| memcpy(&client->transport.proxy.addr, proxy_addr, addrlen); |
| |
| return 0; |
| } |
| |
| return -ENOTSUP; |
| } |
| #endif |
| |
| int mqtt_connect(struct mqtt_client *client) |
| { |
| int err_code; |
| |
| NULL_PARAM_CHECK(client); |
| NULL_PARAM_CHECK(client->client_id.utf8); |
| |
| mqtt_mutex_lock(client); |
| |
| if ((client->tx_buf == NULL) || (client->rx_buf == NULL)) { |
| err_code = -ENOMEM; |
| goto error; |
| } |
| |
| err_code = client_connect(client); |
| |
| error: |
| if (err_code < 0) { |
| client_reset(client); |
| } |
| |
| mqtt_mutex_unlock(client); |
| |
| return err_code; |
| } |
| |
| static int verify_tx_state(const struct mqtt_client *client) |
| { |
| if (!MQTT_HAS_STATE(client, MQTT_STATE_CONNECTED)) { |
| return -ENOTCONN; |
| } |
| |
| return 0; |
| } |
| |
| int mqtt_publish(struct mqtt_client *client, |
| const struct mqtt_publish_param *param) |
| { |
| int err_code; |
| struct buf_ctx packet; |
| struct iovec io_vector[2]; |
| struct msghdr msg; |
| |
| NULL_PARAM_CHECK(client); |
| NULL_PARAM_CHECK(param); |
| |
| MQTT_TRC("[CID %p]:[State 0x%02x]: >> Topic size 0x%08x, " |
| "Data size 0x%08x", client, client->internal.state, |
| param->message.topic.topic.size, |
| param->message.payload.len); |
| |
| mqtt_mutex_lock(client); |
| |
| tx_buf_init(client, &packet); |
| |
| err_code = verify_tx_state(client); |
| if (err_code < 0) { |
| goto error; |
| } |
| |
| err_code = publish_encode(param, &packet); |
| if (err_code < 0) { |
| goto error; |
| } |
| |
| io_vector[0].iov_base = packet.cur; |
| io_vector[0].iov_len = packet.end - packet.cur; |
| io_vector[1].iov_base = param->message.payload.data; |
| io_vector[1].iov_len = param->message.payload.len; |
| |
| memset(&msg, 0, sizeof(msg)); |
| |
| msg.msg_iov = io_vector; |
| msg.msg_iovlen = ARRAY_SIZE(io_vector); |
| |
| err_code = client_write_msg(client, &msg); |
| |
| error: |
| MQTT_TRC("[CID %p]:[State 0x%02x]: << result 0x%08x", |
| client, client->internal.state, err_code); |
| |
| mqtt_mutex_unlock(client); |
| |
| return err_code; |
| } |
| |
| int mqtt_publish_qos1_ack(struct mqtt_client *client, |
| const struct mqtt_puback_param *param) |
| { |
| int err_code; |
| struct buf_ctx packet; |
| |
| NULL_PARAM_CHECK(client); |
| NULL_PARAM_CHECK(param); |
| |
| MQTT_TRC("[CID %p]:[State 0x%02x]: >> Message id 0x%04x", |
| client, client->internal.state, param->message_id); |
| |
| mqtt_mutex_lock(client); |
| |
| tx_buf_init(client, &packet); |
| |
| err_code = verify_tx_state(client); |
| if (err_code < 0) { |
| goto error; |
| } |
| |
| err_code = publish_ack_encode(param, &packet); |
| if (err_code < 0) { |
| goto error; |
| } |
| |
| err_code = client_write(client, packet.cur, packet.end - packet.cur); |
| |
| error: |
| MQTT_TRC("[CID %p]:[State 0x%02x]: << result 0x%08x", |
| client, client->internal.state, err_code); |
| |
| mqtt_mutex_unlock(client); |
| |
| return err_code; |
| } |
| |
| int mqtt_publish_qos2_receive(struct mqtt_client *client, |
| const struct mqtt_pubrec_param *param) |
| { |
| int err_code; |
| struct buf_ctx packet; |
| |
| NULL_PARAM_CHECK(client); |
| NULL_PARAM_CHECK(param); |
| |
| MQTT_TRC("[CID %p]:[State 0x%02x]: >> Message id 0x%04x", |
| client, client->internal.state, param->message_id); |
| |
| mqtt_mutex_lock(client); |
| |
| tx_buf_init(client, &packet); |
| |
| err_code = verify_tx_state(client); |
| if (err_code < 0) { |
| goto error; |
| } |
| |
| err_code = publish_receive_encode(param, &packet); |
| if (err_code < 0) { |
| goto error; |
| } |
| |
| err_code = client_write(client, packet.cur, packet.end - packet.cur); |
| |
| error: |
| MQTT_TRC("[CID %p]:[State 0x%02x]: << result 0x%08x", |
| client, client->internal.state, err_code); |
| |
| mqtt_mutex_unlock(client); |
| |
| return err_code; |
| } |
| |
| int mqtt_publish_qos2_release(struct mqtt_client *client, |
| const struct mqtt_pubrel_param *param) |
| { |
| int err_code; |
| struct buf_ctx packet; |
| |
| NULL_PARAM_CHECK(client); |
| NULL_PARAM_CHECK(param); |
| |
| MQTT_TRC("[CID %p]:[State 0x%02x]: >> Message id 0x%04x", |
| client, client->internal.state, param->message_id); |
| |
| mqtt_mutex_lock(client); |
| |
| tx_buf_init(client, &packet); |
| |
| err_code = verify_tx_state(client); |
| if (err_code < 0) { |
| goto error; |
| } |
| |
| err_code = publish_release_encode(param, &packet); |
| if (err_code < 0) { |
| goto error; |
| } |
| |
| err_code = client_write(client, packet.cur, packet.end - packet.cur); |
| |
| error: |
| MQTT_TRC("[CID %p]:[State 0x%02x]: << result 0x%08x", |
| client, client->internal.state, err_code); |
| |
| mqtt_mutex_unlock(client); |
| |
| return err_code; |
| } |
| |
| int mqtt_publish_qos2_complete(struct mqtt_client *client, |
| const struct mqtt_pubcomp_param *param) |
| { |
| int err_code; |
| struct buf_ctx packet; |
| |
| NULL_PARAM_CHECK(client); |
| NULL_PARAM_CHECK(param); |
| |
| MQTT_TRC("[CID %p]:[State 0x%02x]: >> Message id 0x%04x", |
| client, client->internal.state, param->message_id); |
| |
| mqtt_mutex_lock(client); |
| |
| tx_buf_init(client, &packet); |
| |
| err_code = verify_tx_state(client); |
| if (err_code < 0) { |
| goto error; |
| } |
| |
| err_code = publish_complete_encode(param, &packet); |
| if (err_code < 0) { |
| goto error; |
| } |
| |
| err_code = client_write(client, packet.cur, packet.end - packet.cur); |
| if (err_code < 0) { |
| goto error; |
| } |
| |
| error: |
| MQTT_TRC("[CID %p]:[State 0x%02x]: << result 0x%08x", |
| client, client->internal.state, err_code); |
| |
| mqtt_mutex_unlock(client); |
| |
| return err_code; |
| } |
| |
| int mqtt_disconnect(struct mqtt_client *client) |
| { |
| int err_code; |
| struct buf_ctx packet; |
| |
| NULL_PARAM_CHECK(client); |
| |
| mqtt_mutex_lock(client); |
| |
| tx_buf_init(client, &packet); |
| |
| err_code = verify_tx_state(client); |
| if (err_code < 0) { |
| goto error; |
| } |
| |
| err_code = disconnect_encode(&packet); |
| if (err_code < 0) { |
| goto error; |
| } |
| |
| err_code = client_write(client, packet.cur, packet.end - packet.cur); |
| if (err_code < 0) { |
| goto error; |
| } |
| |
| client_disconnect(client, 0); |
| |
| error: |
| mqtt_mutex_unlock(client); |
| |
| return err_code; |
| } |
| |
| int mqtt_subscribe(struct mqtt_client *client, |
| const struct mqtt_subscription_list *param) |
| { |
| int err_code; |
| struct buf_ctx packet; |
| |
| NULL_PARAM_CHECK(client); |
| NULL_PARAM_CHECK(param); |
| |
| MQTT_TRC("[CID %p]:[State 0x%02x]: >> message id 0x%04x " |
| "topic count 0x%04x", client, client->internal.state, |
| param->message_id, param->list_count); |
| |
| mqtt_mutex_lock(client); |
| |
| tx_buf_init(client, &packet); |
| |
| err_code = verify_tx_state(client); |
| if (err_code < 0) { |
| goto error; |
| } |
| |
| err_code = subscribe_encode(param, &packet); |
| if (err_code < 0) { |
| goto error; |
| } |
| |
| err_code = client_write(client, packet.cur, packet.end - packet.cur); |
| |
| error: |
| MQTT_TRC("[CID %p]:[State 0x%02x]: << result 0x%08x", |
| client, client->internal.state, err_code); |
| |
| mqtt_mutex_unlock(client); |
| |
| return err_code; |
| } |
| |
| int mqtt_unsubscribe(struct mqtt_client *client, |
| const struct mqtt_subscription_list *param) |
| { |
| int err_code; |
| struct buf_ctx packet; |
| |
| NULL_PARAM_CHECK(client); |
| NULL_PARAM_CHECK(param); |
| |
| mqtt_mutex_lock(client); |
| |
| tx_buf_init(client, &packet); |
| |
| err_code = verify_tx_state(client); |
| if (err_code < 0) { |
| goto error; |
| } |
| |
| err_code = unsubscribe_encode(param, &packet); |
| if (err_code < 0) { |
| goto error; |
| } |
| |
| err_code = client_write(client, packet.cur, packet.end - packet.cur); |
| |
| error: |
| mqtt_mutex_unlock(client); |
| |
| return err_code; |
| } |
| |
| int mqtt_ping(struct mqtt_client *client) |
| { |
| int err_code; |
| struct buf_ctx packet; |
| |
| NULL_PARAM_CHECK(client); |
| |
| mqtt_mutex_lock(client); |
| |
| tx_buf_init(client, &packet); |
| |
| err_code = verify_tx_state(client); |
| if (err_code < 0) { |
| goto error; |
| } |
| |
| err_code = ping_request_encode(&packet); |
| if (err_code < 0) { |
| goto error; |
| } |
| |
| err_code = client_write(client, packet.cur, packet.end - packet.cur); |
| |
| if (client->unacked_ping >= INT8_MAX) { |
| MQTT_TRC("PING count overflow!"); |
| } else { |
| client->unacked_ping++; |
| } |
| |
| error: |
| mqtt_mutex_unlock(client); |
| |
| return err_code; |
| } |
| |
| int mqtt_abort(struct mqtt_client *client) |
| { |
| mqtt_mutex_lock(client); |
| |
| NULL_PARAM_CHECK(client); |
| |
| if (client->internal.state != MQTT_STATE_IDLE) { |
| client_disconnect(client, -ECONNABORTED); |
| } |
| |
| mqtt_mutex_unlock(client); |
| |
| return 0; |
| } |
| |
| int mqtt_live(struct mqtt_client *client) |
| { |
| int err_code = 0; |
| u32_t elapsed_time; |
| bool ping_sent = false; |
| |
| NULL_PARAM_CHECK(client); |
| |
| mqtt_mutex_lock(client); |
| |
| elapsed_time = mqtt_elapsed_time_in_ms_get( |
| client->internal.last_activity); |
| if ((client->keepalive > 0) && |
| (elapsed_time >= (client->keepalive * 1000))) { |
| err_code = mqtt_ping(client); |
| ping_sent = true; |
| } |
| |
| mqtt_mutex_unlock(client); |
| |
| if (ping_sent) { |
| return err_code; |
| } else { |
| return -EAGAIN; |
| } |
| } |
| |
| u32_t mqtt_keepalive_time_left(const struct mqtt_client *client) |
| { |
| u32_t elapsed_time = mqtt_elapsed_time_in_ms_get( |
| client->internal.last_activity); |
| u32_t keepalive_ms = 1000U * client->keepalive; |
| |
| if (client->keepalive == 0) { |
| /* Keep alive not enabled. */ |
| return UINT32_MAX; |
| } |
| |
| if (keepalive_ms <= elapsed_time) { |
| return 0; |
| } |
| |
| return keepalive_ms - elapsed_time; |
| } |
| |
| int mqtt_input(struct mqtt_client *client) |
| { |
| int err_code = 0; |
| |
| NULL_PARAM_CHECK(client); |
| |
| mqtt_mutex_lock(client); |
| |
| MQTT_TRC("state:0x%08x", client->internal.state); |
| |
| if (MQTT_HAS_STATE(client, MQTT_STATE_TCP_CONNECTED)) { |
| err_code = client_read(client); |
| } else { |
| err_code = -EACCES; |
| } |
| |
| mqtt_mutex_unlock(client); |
| |
| return err_code; |
| } |
| |
| static int read_publish_payload(struct mqtt_client *client, void *buffer, |
| size_t length, bool shall_block) |
| { |
| int ret; |
| |
| NULL_PARAM_CHECK(client); |
| |
| mqtt_mutex_lock(client); |
| |
| if (client->internal.remaining_payload == 0U) { |
| ret = 0; |
| goto exit; |
| } |
| |
| if (client->internal.remaining_payload < length) { |
| length = client->internal.remaining_payload; |
| } |
| |
| ret = mqtt_transport_read(client, buffer, length, shall_block); |
| if (!shall_block && ret == -EAGAIN) { |
| goto exit; |
| } |
| |
| if (ret <= 0) { |
| if (ret == 0) { |
| ret = -ENOTCONN; |
| } |
| |
| client_disconnect(client, ret); |
| goto exit; |
| } |
| |
| client->internal.remaining_payload -= ret; |
| |
| exit: |
| mqtt_mutex_unlock(client); |
| |
| return ret; |
| } |
| |
| int mqtt_read_publish_payload(struct mqtt_client *client, void *buffer, |
| size_t length) |
| { |
| return read_publish_payload(client, buffer, length, false); |
| } |
| |
| int mqtt_read_publish_payload_blocking(struct mqtt_client *client, void *buffer, |
| size_t length) |
| { |
| return read_publish_payload(client, buffer, length, true); |
| } |
| |
| int mqtt_readall_publish_payload(struct mqtt_client *client, u8_t *buffer, |
| size_t length) |
| { |
| u8_t *end = buffer + length; |
| |
| while (buffer < end) { |
| int ret = mqtt_read_publish_payload_blocking(client, buffer, |
| end - buffer); |
| |
| if (ret < 0) { |
| return ret; |
| } else if (ret == 0) { |
| return -EIO; |
| } |
| |
| buffer += ret; |
| } |
| |
| return 0; |
| } |