| /* |
| * Copyright (c) 2022 G-Technologies Sdn. Bhd. |
| * |
| * SPDX-License-Identifier: Apache-2.0 |
| */ |
| |
| #include <zephyr/kernel.h> |
| #include <zephyr/shell/shell_mqtt.h> |
| #include <zephyr/init.h> |
| #include <zephyr/logging/log.h> |
| #include <string.h> |
| #include <stdio.h> |
| #include <zephyr/drivers/hwinfo.h> |
| |
| SHELL_MQTT_DEFINE(shell_transport_mqtt); |
| SHELL_DEFINE(shell_mqtt, "", &shell_transport_mqtt, |
| CONFIG_SHELL_BACKEND_MQTT_LOG_MESSAGE_QUEUE_SIZE, |
| CONFIG_SHELL_BACKEND_MQTT_LOG_MESSAGE_QUEUE_TIMEOUT, SHELL_FLAG_OLF_CRLF); |
| |
| LOG_MODULE_REGISTER(shell_mqtt, CONFIG_SHELL_MQTT_LOG_LEVEL); |
| |
| #define NET_EVENT_MASK (NET_EVENT_L4_CONNECTED | NET_EVENT_L4_DISCONNECTED) |
| #define CONNECT_TIMEOUT_MS 2000 |
| #define LISTEN_TIMEOUT_MS 500 |
| #define MQTT_SEND_DELAY_MS K_MSEC(100) |
| #define PROCESS_INTERVAL K_SECONDS(2) |
| #define SHELL_MQTT_WORKQ_STACK_SIZE 2048 |
| |
| #ifdef CONFIG_SHELL_MQTT_SERVER_USERNAME |
| #define MQTT_USERNAME CONFIG_SHELL_MQTT_SERVER_USERNAME |
| #else |
| #define MQTT_USERNAME NULL |
| #endif /* CONFIG_SHELL_MQTT_SERVER_USERNAME */ |
| |
| #ifdef CONFIG_SHELL_MQTT_SERVER_PASSWORD |
| #define MQTT_PASSWORD CONFIG_SHELL_MQTT_SERVER_PASSWORD |
| #else |
| #define MQTT_PASSWORD NULL |
| #endif /*SHELL_MQTT_SERVER_PASSWORD */ |
| |
| struct shell_mqtt *sh_mqtt; |
| K_KERNEL_STACK_DEFINE(sh_mqtt_workq_stack, SHELL_MQTT_WORKQ_STACK_SIZE); |
| |
| static void mqtt_evt_handler(struct mqtt_client *const client, const struct mqtt_evt *evt); |
| |
| static inline int sh_mqtt_work_reschedule(struct k_work_delayable *dwork, k_timeout_t delay) |
| { |
| return k_work_reschedule_for_queue(&sh_mqtt->workq, dwork, delay); |
| } |
| |
| static inline int sh_mqtt_work_submit(struct k_work *work) |
| { |
| return k_work_submit_to_queue(&sh_mqtt->workq, work); |
| } |
| |
| /* Lock the context of the shell mqtt */ |
| static inline int sh_mqtt_context_lock(k_timeout_t timeout) |
| { |
| return k_mutex_lock(&sh_mqtt->lock, timeout); |
| } |
| |
| /* Unlock the context of the shell mqtt */ |
| static inline void sh_mqtt_context_unlock(void) |
| { |
| (void)k_mutex_unlock(&sh_mqtt->lock); |
| } |
| |
| static void sh_mqtt_rx_rb_flush(void) |
| { |
| uint8_t c; |
| uint32_t size = ring_buf_size_get(&sh_mqtt->rx_rb); |
| |
| while (size > 0) { |
| size = ring_buf_get(&sh_mqtt->rx_rb, &c, 1U); |
| } |
| } |
| |
| bool __weak shell_mqtt_get_devid(char *id, int id_max_len) |
| { |
| uint8_t hwinfo_id[DEVICE_ID_BIN_MAX_SIZE]; |
| ssize_t length; |
| |
| length = hwinfo_get_device_id(hwinfo_id, DEVICE_ID_BIN_MAX_SIZE); |
| if (length <= 0) { |
| return false; |
| } |
| |
| (void)memset(id, 0, id_max_len); |
| length = bin2hex(hwinfo_id, (size_t)length, id, id_max_len); |
| |
| return length > 0; |
| } |
| |
| static void prepare_fds(void) |
| { |
| if (sh_mqtt->mqtt_cli.transport.type == MQTT_TRANSPORT_NON_SECURE) { |
| sh_mqtt->fds[0].fd = sh_mqtt->mqtt_cli.transport.tcp.sock; |
| } |
| |
| sh_mqtt->fds[0].events = ZSOCK_POLLIN; |
| sh_mqtt->nfds = 1; |
| } |
| |
| static void clear_fds(void) |
| { |
| sh_mqtt->nfds = 0; |
| } |
| |
| /* |
| * Upon successful completion, poll() shall return a non-negative value. A positive value indicates |
| * the total number of pollfd structures that have selected events (that is, those for which the |
| * revents member is non-zero). A value of 0 indicates that the call timed out and no file |
| * descriptors have been selected. Upon failure, poll() shall return -1 and set errno to indicate |
| * the error. |
| */ |
| static int wait(int timeout) |
| { |
| int rc = 0; |
| |
| if (sh_mqtt->nfds > 0) { |
| rc = zsock_poll(sh_mqtt->fds, sh_mqtt->nfds, timeout); |
| if (rc < 0) { |
| LOG_ERR("poll error: %d", errno); |
| } |
| } |
| |
| return rc; |
| } |
| |
| /* Query IP address for the broker URL */ |
| static int get_mqtt_broker_addrinfo(void) |
| { |
| int rc; |
| struct zsock_addrinfo hints = { .ai_family = AF_INET, |
| .ai_socktype = SOCK_STREAM, |
| .ai_protocol = 0 }; |
| |
| if (sh_mqtt->haddr != NULL) { |
| zsock_freeaddrinfo(sh_mqtt->haddr); |
| } |
| |
| rc = zsock_getaddrinfo(CONFIG_SHELL_MQTT_SERVER_ADDR, |
| STRINGIFY(CONFIG_SHELL_MQTT_SERVER_PORT), &hints, &sh_mqtt->haddr); |
| if (rc == 0) { |
| LOG_INF("DNS%s resolved for %s:%d", "", CONFIG_SHELL_MQTT_SERVER_ADDR, |
| CONFIG_SHELL_MQTT_SERVER_PORT); |
| |
| return 0; |
| } |
| |
| LOG_ERR("DNS%s resolved for %s:%d, retrying", " not", CONFIG_SHELL_MQTT_SERVER_ADDR, |
| CONFIG_SHELL_MQTT_SERVER_PORT); |
| |
| return rc; |
| } |
| |
| /* Close MQTT connection properly and cleanup socket */ |
| static void sh_mqtt_close_and_cleanup(void) |
| { |
| /* Initialize to negative value so that the mqtt_abort case can run */ |
| int rc = -1; |
| |
| /* If both network & mqtt connected, mqtt_disconnect will send a |
| * disconnection packet to the broker, it will invoke |
| * mqtt_evt_handler:MQTT_EVT_DISCONNECT if success |
| */ |
| if ((sh_mqtt->network_state == SHELL_MQTT_NETWORK_CONNECTED) && |
| (sh_mqtt->transport_state == SHELL_MQTT_TRANSPORT_CONNECTED)) { |
| rc = mqtt_disconnect(&sh_mqtt->mqtt_cli); |
| } |
| |
| /* If network/mqtt disconnected, or mqtt_disconnect failed, do mqtt_abort */ |
| if (rc < 0) { |
| /* mqtt_abort doesn't send disconnection packet to the broker, but it |
| * makes sure that the MQTT connection is aborted locally and will |
| * always invoke mqtt_evt_handler:MQTT_EVT_DISCONNECT |
| */ |
| (void)mqtt_abort(&sh_mqtt->mqtt_cli); |
| } |
| |
| /* Cleanup socket */ |
| clear_fds(); |
| } |
| |
| static void broker_init(void) |
| { |
| struct sockaddr_in *broker4 = (struct sockaddr_in *)&sh_mqtt->broker; |
| |
| broker4->sin_family = AF_INET; |
| broker4->sin_port = htons(CONFIG_SHELL_MQTT_SERVER_PORT); |
| |
| net_ipaddr_copy(&broker4->sin_addr, &net_sin(sh_mqtt->haddr->ai_addr)->sin_addr); |
| } |
| |
| static void client_init(void) |
| { |
| static struct mqtt_utf8 password; |
| static struct mqtt_utf8 username; |
| |
| password.utf8 = (uint8_t *)MQTT_PASSWORD; |
| password.size = strlen(MQTT_PASSWORD); |
| username.utf8 = (uint8_t *)MQTT_USERNAME; |
| username.size = strlen(MQTT_USERNAME); |
| |
| mqtt_client_init(&sh_mqtt->mqtt_cli); |
| |
| /* MQTT client configuration */ |
| sh_mqtt->mqtt_cli.broker = &sh_mqtt->broker; |
| sh_mqtt->mqtt_cli.evt_cb = mqtt_evt_handler; |
| sh_mqtt->mqtt_cli.client_id.utf8 = (uint8_t *)sh_mqtt->device_id; |
| sh_mqtt->mqtt_cli.client_id.size = strlen(sh_mqtt->device_id); |
| sh_mqtt->mqtt_cli.password = &password; |
| sh_mqtt->mqtt_cli.user_name = &username; |
| sh_mqtt->mqtt_cli.protocol_version = MQTT_VERSION_3_1_1; |
| |
| /* MQTT buffers configuration */ |
| sh_mqtt->mqtt_cli.rx_buf = sh_mqtt->buf.rx; |
| sh_mqtt->mqtt_cli.rx_buf_size = sizeof(sh_mqtt->buf.rx); |
| sh_mqtt->mqtt_cli.tx_buf = sh_mqtt->buf.tx; |
| sh_mqtt->mqtt_cli.tx_buf_size = sizeof(sh_mqtt->buf.tx); |
| |
| /* MQTT transport configuration */ |
| sh_mqtt->mqtt_cli.transport.type = MQTT_TRANSPORT_NON_SECURE; |
| } |
| |
| /* Work routine to process MQTT packet and keep alive MQTT connection */ |
| static void sh_mqtt_process_handler(struct k_work *work) |
| { |
| ARG_UNUSED(work); |
| int rc; |
| int64_t remaining = LISTEN_TIMEOUT_MS; |
| int64_t start_time = k_uptime_get(); |
| |
| if (sh_mqtt->network_state != SHELL_MQTT_NETWORK_CONNECTED) { |
| LOG_DBG("%s_work while %s", "process", "network disconnected"); |
| return; |
| } |
| |
| /* If context can't be locked, that means net conn cb locked it */ |
| if (sh_mqtt_context_lock(K_NO_WAIT) != 0) { |
| /* In that case we should simply return */ |
| LOG_DBG("%s_work unable to lock context", "process"); |
| return; |
| } |
| |
| if (sh_mqtt->transport_state != SHELL_MQTT_TRANSPORT_CONNECTED) { |
| LOG_DBG("MQTT %s", "not connected"); |
| goto process_error; |
| } |
| |
| if (sh_mqtt->subscribe_state != SHELL_MQTT_SUBSCRIBED) { |
| LOG_DBG("%s_work while %s", "process", "MQTT not subscribed"); |
| goto process_error; |
| } |
| |
| LOG_DBG("MQTT %s", "Processing"); |
| /* Listen to the port for a duration defined by LISTEN_TIMEOUT_MS */ |
| while ((remaining > 0) && (sh_mqtt->network_state == SHELL_MQTT_NETWORK_CONNECTED) && |
| (sh_mqtt->transport_state == SHELL_MQTT_TRANSPORT_CONNECTED) && |
| (sh_mqtt->subscribe_state == SHELL_MQTT_SUBSCRIBED)) { |
| LOG_DBG("Listening to socket"); |
| rc = wait(remaining); |
| if (rc > 0) { |
| LOG_DBG("Process socket for MQTT packet"); |
| rc = mqtt_input(&sh_mqtt->mqtt_cli); |
| if (rc != 0) { |
| LOG_ERR("%s error: %d", "processed: mqtt_input", rc); |
| goto process_error; |
| } |
| } else if (rc < 0) { |
| goto process_error; |
| } |
| |
| LOG_DBG("MQTT %s", "Keepalive"); |
| rc = mqtt_live(&sh_mqtt->mqtt_cli); |
| if ((rc != 0) && (rc != -EAGAIN)) { |
| LOG_ERR("%s error: %d", "mqtt_live", rc); |
| goto process_error; |
| } |
| |
| remaining = LISTEN_TIMEOUT_MS + start_time - k_uptime_get(); |
| } |
| |
| /* Reschedule the process work */ |
| LOG_DBG("Scheduling %s work", "process"); |
| (void)sh_mqtt_work_reschedule(&sh_mqtt->process_dwork, K_SECONDS(2)); |
| sh_mqtt_context_unlock(); |
| return; |
| |
| process_error: |
| LOG_DBG("%s: close MQTT, cleanup socket & reconnect", "connect"); |
| sh_mqtt_close_and_cleanup(); |
| (void)sh_mqtt_work_reschedule(&sh_mqtt->connect_dwork, K_SECONDS(1)); |
| sh_mqtt_context_unlock(); |
| } |
| |
| static void sh_mqtt_subscribe_handler(struct k_work *work) |
| { |
| ARG_UNUSED(work); |
| /* Subscribe config information */ |
| struct mqtt_topic subs_topic = { .topic = { .utf8 = sh_mqtt->sub_topic, |
| .size = strlen(sh_mqtt->sub_topic) }, |
| .qos = MQTT_QOS_1_AT_LEAST_ONCE }; |
| const struct mqtt_subscription_list subs_list = { .list = &subs_topic, |
| .list_count = 1U, |
| .message_id = 1U }; |
| int rc; |
| |
| if (sh_mqtt->network_state != SHELL_MQTT_NETWORK_CONNECTED) { |
| LOG_DBG("%s_work while %s", "subscribe", "network disconnected"); |
| return; |
| } |
| |
| /* If context can't be locked, that means net conn cb locked it */ |
| if (sh_mqtt_context_lock(K_NO_WAIT) != 0) { |
| /* In that case we should simply return */ |
| LOG_DBG("%s_work unable to lock context", "subscribe"); |
| return; |
| } |
| |
| if (sh_mqtt->transport_state != SHELL_MQTT_TRANSPORT_CONNECTED) { |
| LOG_DBG("%s_work while %s", "subscribe", "transport disconnected"); |
| goto subscribe_error; |
| } |
| |
| rc = mqtt_subscribe(&sh_mqtt->mqtt_cli, &subs_list); |
| if (rc == 0) { |
| /* Wait for mqtt's connack */ |
| LOG_DBG("Listening to socket"); |
| rc = wait(CONNECT_TIMEOUT_MS); |
| if (rc > 0) { |
| LOG_DBG("Process socket for MQTT packet"); |
| rc = mqtt_input(&sh_mqtt->mqtt_cli); |
| if (rc != 0) { |
| LOG_ERR("%s error: %d", "subscribe: mqtt_input", rc); |
| goto subscribe_error; |
| } |
| } else if (rc < 0) { |
| goto subscribe_error; |
| } |
| |
| /* No suback, fail */ |
| if (sh_mqtt->subscribe_state != SHELL_MQTT_SUBSCRIBED) { |
| goto subscribe_error; |
| } |
| |
| LOG_DBG("Scheduling MQTT process work"); |
| (void)sh_mqtt_work_reschedule(&sh_mqtt->process_dwork, PROCESS_INTERVAL); |
| sh_mqtt_context_unlock(); |
| |
| LOG_INF("Logs will be published to: %s", sh_mqtt->pub_topic); |
| LOG_INF("Subscribing shell cmds from: %s", sh_mqtt->sub_topic); |
| |
| return; |
| } |
| |
| subscribe_error: |
| LOG_DBG("%s: close MQTT, cleanup socket & reconnect", "subscribe"); |
| sh_mqtt_close_and_cleanup(); |
| (void)sh_mqtt_work_reschedule(&sh_mqtt->connect_dwork, K_SECONDS(2)); |
| sh_mqtt_context_unlock(); |
| } |
| |
| /* Work routine to connect to MQTT */ |
| static void sh_mqtt_connect_handler(struct k_work *work) |
| { |
| ARG_UNUSED(work); |
| int rc; |
| |
| if (sh_mqtt->network_state != SHELL_MQTT_NETWORK_CONNECTED) { |
| LOG_DBG("%s_work while %s", "connect", "network disconnected"); |
| return; |
| } |
| |
| /* If context can't be locked, that means net conn cb locked it */ |
| if (sh_mqtt_context_lock(K_NO_WAIT) != 0) { |
| /* In that case we should simply return */ |
| LOG_DBG("%s_work unable to lock context", "connect"); |
| return; |
| } |
| |
| if (sh_mqtt->transport_state == SHELL_MQTT_TRANSPORT_CONNECTED) { |
| __ASSERT(0, "MQTT shouldn't be already connected"); |
| LOG_ERR("MQTT shouldn't be already connected"); |
| goto connect_error; |
| } |
| |
| /* Resolve the broker URL */ |
| LOG_DBG("Resolving DNS"); |
| rc = get_mqtt_broker_addrinfo(); |
| if (rc != 0) { |
| (void)sh_mqtt_work_reschedule(&sh_mqtt->connect_dwork, K_SECONDS(1)); |
| sh_mqtt_context_unlock(); |
| return; |
| } |
| |
| LOG_DBG("Initializing MQTT client"); |
| broker_init(); |
| client_init(); |
| |
| /* Try to connect to mqtt */ |
| LOG_DBG("Connecting to MQTT broker"); |
| rc = mqtt_connect(&sh_mqtt->mqtt_cli); |
| if (rc != 0) { |
| LOG_ERR("%s error: %d", "mqtt_connect", rc); |
| goto connect_error; |
| } |
| |
| /* Prepare port config */ |
| LOG_DBG("Preparing socket"); |
| prepare_fds(); |
| |
| /* Wait for mqtt's connack */ |
| LOG_DBG("Listening to socket"); |
| rc = wait(CONNECT_TIMEOUT_MS); |
| if (rc > 0) { |
| LOG_DBG("Process socket for MQTT packet"); |
| rc = mqtt_input(&sh_mqtt->mqtt_cli); |
| if (rc != 0) { |
| LOG_ERR("%s error: %d", "connect: mqtt_input", rc); |
| goto connect_error; |
| } |
| } else if (rc < 0) { |
| goto connect_error; |
| } |
| |
| /* No connack, fail */ |
| if (sh_mqtt->transport_state != SHELL_MQTT_TRANSPORT_CONNECTED) { |
| goto connect_error; |
| } |
| |
| LOG_DBG("Scheduling %s work", "subscribe"); |
| (void)sh_mqtt_work_reschedule(&sh_mqtt->subscribe_dwork, K_SECONDS(2)); |
| sh_mqtt_context_unlock(); |
| return; |
| |
| connect_error: |
| LOG_DBG("%s: close MQTT, cleanup socket & reconnect", "connect"); |
| sh_mqtt_close_and_cleanup(); |
| (void)sh_mqtt_work_reschedule(&sh_mqtt->connect_dwork, K_SECONDS(2)); |
| sh_mqtt_context_unlock(); |
| } |
| |
| static int sh_mqtt_publish(uint8_t *data, uint32_t len) |
| { |
| sh_mqtt->pub_data.message.payload.data = data; |
| sh_mqtt->pub_data.message.payload.len = len; |
| sh_mqtt->pub_data.message_id++; |
| |
| return mqtt_publish(&sh_mqtt->mqtt_cli, &sh_mqtt->pub_data); |
| } |
| |
| static int sh_mqtt_publish_tx_buf(bool is_work) |
| { |
| int rc; |
| |
| rc = sh_mqtt_publish(&sh_mqtt->tx_buf.buf[0], sh_mqtt->tx_buf.len); |
| memset(&sh_mqtt->tx_buf, 0, sizeof(sh_mqtt->tx_buf)); |
| if (rc != 0) { |
| LOG_ERR("MQTT publish error: %d", rc); |
| return rc; |
| } |
| |
| /* Arbitrary delay to not kill the session */ |
| if (!is_work) { |
| k_sleep(MQTT_SEND_DELAY_MS); |
| } |
| |
| return rc; |
| } |
| |
| static void sh_mqtt_publish_handler(struct k_work *work) |
| { |
| ARG_UNUSED(work); |
| int rc; |
| |
| (void)sh_mqtt_context_lock(K_FOREVER); |
| |
| rc = sh_mqtt_publish_tx_buf(true); |
| if (rc != 0) { |
| LOG_DBG("%s: close MQTT, cleanup socket & reconnect", "publish"); |
| sh_mqtt_close_and_cleanup(); |
| (void)sh_mqtt_work_reschedule(&sh_mqtt->connect_dwork, K_SECONDS(2)); |
| } |
| |
| sh_mqtt_context_unlock(); |
| } |
| |
| static void cancel_dworks_and_cleanup(void) |
| { |
| (void)k_work_cancel_delayable(&sh_mqtt->connect_dwork); |
| (void)k_work_cancel_delayable(&sh_mqtt->subscribe_dwork); |
| (void)k_work_cancel_delayable(&sh_mqtt->process_dwork); |
| (void)k_work_cancel_delayable(&sh_mqtt->publish_dwork); |
| sh_mqtt_close_and_cleanup(); |
| } |
| |
| static void net_disconnect_handler(struct k_work *work) |
| { |
| ARG_UNUSED(work); |
| |
| LOG_WRN("Network %s", "disconnected"); |
| sh_mqtt->network_state = SHELL_MQTT_NETWORK_DISCONNECTED; |
| |
| /* Stop all possible work */ |
| (void)sh_mqtt_context_lock(K_FOREVER); |
| cancel_dworks_and_cleanup(); |
| sh_mqtt_context_unlock(); |
| /* If the transport was requested, the connect work will be rescheduled |
| * when internet is connected again |
| */ |
| } |
| |
| /* Network connection event handler */ |
| static void network_evt_handler(struct net_mgmt_event_callback *cb, uint32_t mgmt_event, |
| struct net_if *iface) |
| { |
| if ((mgmt_event == NET_EVENT_L4_CONNECTED) && |
| (sh_mqtt->network_state == SHELL_MQTT_NETWORK_DISCONNECTED)) { |
| LOG_WRN("Network %s", "connected"); |
| sh_mqtt->network_state = SHELL_MQTT_NETWORK_CONNECTED; |
| (void)sh_mqtt_work_reschedule(&sh_mqtt->connect_dwork, K_SECONDS(1)); |
| } else if ((mgmt_event == NET_EVENT_L4_DISCONNECTED) && |
| (sh_mqtt->network_state == SHELL_MQTT_NETWORK_CONNECTED)) { |
| (void)sh_mqtt_work_submit(&sh_mqtt->net_disconnected_work); |
| } |
| } |
| |
| static void mqtt_evt_handler(struct mqtt_client *const client, const struct mqtt_evt *evt) |
| { |
| switch (evt->type) { |
| case MQTT_EVT_CONNACK: |
| if (evt->result != 0) { |
| sh_mqtt->transport_state = SHELL_MQTT_TRANSPORT_DISCONNECTED; |
| LOG_ERR("MQTT %s %d", "connect failed", evt->result); |
| break; |
| } |
| |
| sh_mqtt->transport_state = SHELL_MQTT_TRANSPORT_CONNECTED; |
| LOG_WRN("MQTT %s", "client connected!"); |
| |
| break; |
| case MQTT_EVT_SUBACK: |
| if (evt->result != 0) { |
| LOG_ERR("MQTT subscribe: %s", "error"); |
| sh_mqtt->subscribe_state = SHELL_MQTT_NOT_SUBSCRIBED; |
| break; |
| } |
| |
| LOG_WRN("MQTT subscribe: %s", "ok"); |
| sh_mqtt->subscribe_state = SHELL_MQTT_SUBSCRIBED; |
| break; |
| |
| case MQTT_EVT_UNSUBACK: |
| LOG_DBG("UNSUBACK packet id: %u", evt->param.suback.message_id); |
| sh_mqtt->subscribe_state = SHELL_MQTT_NOT_SUBSCRIBED; |
| break; |
| |
| case MQTT_EVT_DISCONNECT: |
| LOG_WRN("MQTT disconnected: %d", evt->result); |
| sh_mqtt->transport_state = SHELL_MQTT_TRANSPORT_DISCONNECTED; |
| sh_mqtt->subscribe_state = SHELL_MQTT_NOT_SUBSCRIBED; |
| break; |
| |
| case MQTT_EVT_PUBLISH: { |
| const struct mqtt_publish_param *pub = &evt->param.publish; |
| uint32_t payload_left; |
| size_t size; |
| int rc; |
| |
| payload_left = pub->message.payload.len; |
| |
| LOG_DBG("MQTT publish received %d, %d bytes", evt->result, payload_left); |
| LOG_DBG(" id: %d, qos: %d", pub->message_id, pub->message.topic.qos); |
| LOG_DBG(" item: %s", pub->message.topic.topic.utf8); |
| |
| /* For MQTT_QOS_0_AT_MOST_ONCE no acknowledgment needed */ |
| if (pub->message.topic.qos == MQTT_QOS_1_AT_LEAST_ONCE) { |
| struct mqtt_puback_param puback = { .message_id = pub->message_id }; |
| |
| (void)mqtt_publish_qos1_ack(client, &puback); |
| } |
| |
| while (payload_left > 0) { |
| /* Attempt to claim `payload_left` bytes of buffer in rb */ |
| size = (size_t)ring_buf_put_claim(&sh_mqtt->rx_rb, &sh_mqtt->rx_rb_ptr, |
| payload_left); |
| /* Read `size` bytes of payload from mqtt */ |
| rc = mqtt_read_publish_payload_blocking(client, sh_mqtt->rx_rb_ptr, size); |
| |
| /* errno value, return */ |
| if (rc < 0) { |
| (void)ring_buf_put_finish(&sh_mqtt->rx_rb, 0U); |
| sh_mqtt_rx_rb_flush(); |
| return; |
| } |
| |
| size = (size_t)rc; |
| /* Indicate that `size` bytes of payload has been written into rb */ |
| (void)ring_buf_put_finish(&sh_mqtt->rx_rb, size); |
| /* Update `payload_left` */ |
| payload_left -= size; |
| /* Tells the shell that we have new data for it */ |
| sh_mqtt->shell_handler(SHELL_TRANSPORT_EVT_RX_RDY, sh_mqtt->shell_context); |
| /* Arbitrary sleep for the shell to do its thing */ |
| (void)k_msleep(100); |
| } |
| |
| /* Shell won't execute the cmds without \r\n */ |
| while (true) { |
| /* Check if rb's free space is enough to fit in \r\n */ |
| size = ring_buf_space_get(&sh_mqtt->rx_rb); |
| if (size >= sizeof("\r\n")) { |
| (void)ring_buf_put(&sh_mqtt->rx_rb, "\r\n", sizeof("\r\n")); |
| break; |
| } |
| /* Arbitrary sleep for the shell to do its thing */ |
| (void)k_msleep(100); |
| } |
| |
| sh_mqtt->shell_handler(SHELL_TRANSPORT_EVT_RX_RDY, sh_mqtt->shell_context); |
| break; |
| } |
| |
| case MQTT_EVT_PUBACK: |
| if (evt->result != 0) { |
| LOG_ERR("MQTT PUBACK error %d", evt->result); |
| break; |
| } |
| |
| LOG_DBG("PUBACK packet id: %u", evt->param.puback.message_id); |
| break; |
| |
| case MQTT_EVT_PINGRESP: |
| LOG_DBG("PINGRESP packet"); |
| break; |
| |
| default: |
| LOG_DBG("MQTT event received %d", evt->type); |
| break; |
| } |
| } |
| |
| static int init(const struct shell_transport *transport, const void *config, |
| shell_transport_handler_t evt_handler, void *context) |
| { |
| sh_mqtt = (struct shell_mqtt *)transport->ctx; |
| |
| (void)memset(sh_mqtt, 0, sizeof(struct shell_mqtt)); |
| |
| (void)k_mutex_init(&sh_mqtt->lock); |
| |
| if (!shell_mqtt_get_devid(sh_mqtt->device_id, DEVICE_ID_HEX_MAX_SIZE)) { |
| LOG_ERR("Unable to get device identity, using dummy value"); |
| (void)snprintf(sh_mqtt->device_id, sizeof("dummy"), "dummy"); |
| } |
| |
| LOG_DBG("Client ID is %s", sh_mqtt->device_id); |
| |
| (void)snprintf(sh_mqtt->pub_topic, SH_MQTT_TOPIC_MAX_SIZE, "%s_tx", sh_mqtt->device_id); |
| (void)snprintf(sh_mqtt->sub_topic, SH_MQTT_TOPIC_MAX_SIZE, "%s_rx", sh_mqtt->device_id); |
| |
| ring_buf_init(&sh_mqtt->rx_rb, RX_RB_SIZE, sh_mqtt->rx_rb_buf); |
| |
| LOG_DBG("Initializing shell MQTT backend"); |
| |
| sh_mqtt->shell_handler = evt_handler; |
| sh_mqtt->shell_context = context; |
| |
| sh_mqtt->pub_data.message.topic.qos = MQTT_QOS_0_AT_MOST_ONCE; |
| sh_mqtt->pub_data.message.topic.topic.utf8 = (uint8_t *)sh_mqtt->pub_topic; |
| sh_mqtt->pub_data.message.topic.topic.size = |
| strlen(sh_mqtt->pub_data.message.topic.topic.utf8); |
| sh_mqtt->pub_data.dup_flag = 0U; |
| sh_mqtt->pub_data.retain_flag = 0U; |
| |
| /* Initialize the work queue */ |
| k_work_queue_init(&sh_mqtt->workq); |
| k_work_queue_start(&sh_mqtt->workq, sh_mqtt_workq_stack, |
| K_KERNEL_STACK_SIZEOF(sh_mqtt_workq_stack), K_PRIO_COOP(7), NULL); |
| (void)k_thread_name_set(&sh_mqtt->workq.thread, "sh_mqtt_workq"); |
| k_work_init(&sh_mqtt->net_disconnected_work, net_disconnect_handler); |
| k_work_init_delayable(&sh_mqtt->connect_dwork, sh_mqtt_connect_handler); |
| k_work_init_delayable(&sh_mqtt->subscribe_dwork, sh_mqtt_subscribe_handler); |
| k_work_init_delayable(&sh_mqtt->process_dwork, sh_mqtt_process_handler); |
| k_work_init_delayable(&sh_mqtt->publish_dwork, sh_mqtt_publish_handler); |
| |
| LOG_DBG("Initializing listener for network"); |
| net_mgmt_init_event_callback(&sh_mqtt->mgmt_cb, network_evt_handler, NET_EVENT_MASK); |
| |
| sh_mqtt->network_state = SHELL_MQTT_NETWORK_DISCONNECTED; |
| sh_mqtt->transport_state = SHELL_MQTT_TRANSPORT_DISCONNECTED; |
| sh_mqtt->subscribe_state = SHELL_MQTT_NOT_SUBSCRIBED; |
| |
| return 0; |
| } |
| |
| static int uninit(const struct shell_transport *transport) |
| { |
| ARG_UNUSED(transport); |
| |
| /* Not initialized yet */ |
| if (sh_mqtt == NULL) { |
| return -ENODEV; |
| } |
| |
| return 0; |
| } |
| |
| static int enable(const struct shell_transport *transport, bool blocking) |
| { |
| ARG_UNUSED(transport); |
| ARG_UNUSED(blocking); |
| |
| /* Not initialized yet */ |
| if (sh_mqtt == NULL) { |
| return -ENODEV; |
| } |
| |
| /* Listen for network connection status */ |
| net_mgmt_add_event_callback(&sh_mqtt->mgmt_cb); |
| conn_mgr_mon_resend_status(); |
| |
| return 0; |
| } |
| |
| static int write_data(const struct shell_transport *transport, const void *data, size_t length, |
| size_t *cnt) |
| { |
| ARG_UNUSED(transport); |
| int rc = 0; |
| struct k_work_sync ws; |
| size_t copy_len; |
| |
| *cnt = 0; |
| |
| /* Not initialized yet */ |
| if (sh_mqtt == NULL) { |
| return -ENODEV; |
| } |
| |
| /* Not connected to broker */ |
| if (sh_mqtt->transport_state != SHELL_MQTT_TRANSPORT_CONNECTED) { |
| goto out; |
| } |
| |
| (void)k_work_cancel_delayable_sync(&sh_mqtt->publish_dwork, &ws); |
| |
| do { |
| if ((sh_mqtt->tx_buf.len + length - *cnt) > TX_BUF_SIZE) { |
| copy_len = TX_BUF_SIZE - sh_mqtt->tx_buf.len; |
| } else { |
| copy_len = length - *cnt; |
| } |
| |
| memcpy(sh_mqtt->tx_buf.buf + sh_mqtt->tx_buf.len, (uint8_t *)data + *cnt, copy_len); |
| sh_mqtt->tx_buf.len += copy_len; |
| |
| /* Send the data immediately if the buffer is full */ |
| if (sh_mqtt->tx_buf.len == TX_BUF_SIZE) { |
| rc = sh_mqtt_publish_tx_buf(false); |
| if (rc != 0) { |
| sh_mqtt_close_and_cleanup(); |
| (void)sh_mqtt_work_reschedule(&sh_mqtt->connect_dwork, |
| K_SECONDS(2)); |
| *cnt = length; |
| return rc; |
| } |
| } |
| |
| *cnt += copy_len; |
| } while (*cnt < length); |
| |
| if (sh_mqtt->tx_buf.len > 0) { |
| (void)sh_mqtt_work_reschedule(&sh_mqtt->publish_dwork, MQTT_SEND_DELAY_MS); |
| } |
| |
| /* Inform shell that it is ready for next TX */ |
| sh_mqtt->shell_handler(SHELL_TRANSPORT_EVT_TX_RDY, sh_mqtt->shell_context); |
| |
| out: |
| /* We will always assume that we sent everything */ |
| *cnt = length; |
| return rc; |
| } |
| |
| static int read_data(const struct shell_transport *transport, void *data, size_t length, |
| size_t *cnt) |
| { |
| ARG_UNUSED(transport); |
| |
| /* Not initialized yet */ |
| if (sh_mqtt == NULL) { |
| return -ENODEV; |
| } |
| |
| /* Not subscribed yet */ |
| if (sh_mqtt->subscribe_state != SHELL_MQTT_SUBSCRIBED) { |
| *cnt = 0; |
| return 0; |
| } |
| |
| *cnt = ring_buf_get(&sh_mqtt->rx_rb, data, length); |
| |
| /* Inform the shell if there are still data in the rb */ |
| if (ring_buf_size_get(&sh_mqtt->rx_rb) > 0) { |
| sh_mqtt->shell_handler(SHELL_TRANSPORT_EVT_RX_RDY, sh_mqtt->shell_context); |
| } |
| |
| return 0; |
| } |
| |
| const struct shell_transport_api shell_mqtt_transport_api = { .init = init, |
| .uninit = uninit, |
| .enable = enable, |
| .write = write_data, |
| .read = read_data }; |
| |
| static int enable_shell_mqtt(void) |
| { |
| |
| bool log_backend = CONFIG_SHELL_MQTT_INIT_LOG_LEVEL > 0; |
| uint32_t level = (CONFIG_SHELL_MQTT_INIT_LOG_LEVEL > LOG_LEVEL_DBG) ? |
| CONFIG_LOG_MAX_LEVEL : |
| CONFIG_SHELL_MQTT_INIT_LOG_LEVEL; |
| static const struct shell_backend_config_flags cfg_flags = { |
| .insert_mode = 0, |
| .echo = 0, |
| .obscure = 0, |
| .mode_delete = 0, |
| .use_colors = 0, |
| .use_vt100 = 0, |
| }; |
| |
| return shell_init(&shell_mqtt, NULL, cfg_flags, log_backend, level); |
| } |
| |
| /* Function is used for testing purposes */ |
| const struct shell *shell_backend_mqtt_get_ptr(void) |
| { |
| return &shell_mqtt; |
| } |
| |
| SYS_INIT(enable_shell_mqtt, APPLICATION, CONFIG_APPLICATION_INIT_PRIORITY); |