| /* |
| * Copyright (c) 2016 Intel Corporation |
| * |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| #include "mqtt.h" |
| |
| #include <errno.h> |
| |
| int mqtt_cb_publish(struct mqtt_app_ctx_t *app, |
| int (*cb_publish)(struct mqtt_msg_t *msg)) |
| { |
| app->cb_publish = cb_publish; |
| |
| return 0; |
| } |
| |
| int mqtt_init(struct mqtt_app_ctx_t *app) |
| { |
| nano_sem_init(&app->sem); |
| nano_sem_give(&app->sem); |
| |
| return 0; |
| } |
| |
| int mqtt_configure(struct mqtt_app_ctx_t *app, const char *client_id, |
| int clean_session, enum mqtt_protocol proto) |
| { |
| app->client->client_id = client_id; |
| app->client->clean_session = clean_session; |
| app->client->proto = proto; |
| |
| return 0; |
| } |
| |
| int mqtt_auth(struct mqtt_app_ctx_t *app, const char *username, |
| const char *pass) |
| { |
| app->client->username = username; |
| app->client->pass = pass; |
| |
| return 0; |
| } |
| |
| int mqtt_will(struct mqtt_app_ctx_t *app, const char *topic, |
| const char *msg, enum mqtt_qos qos, int retained) |
| { |
| app->client->will_enabled = 1; |
| app->client->will_topic = topic; |
| app->client->will_payload = msg; |
| app->client->will_qos = qos; |
| app->client->will_retained = retained; |
| |
| return 0; |
| } |
| |
| int mqtt_buffers(struct mqtt_app_ctx_t *app, struct app_buf_t *tx_buf, |
| struct app_buf_t *rx_buf) |
| { |
| app->tx_buf = tx_buf; |
| app->rx_buf = rx_buf; |
| |
| return 0; |
| } |
| |
| int mqtt_network(struct mqtt_app_ctx_t *app, struct netz_ctx_t *netz_ctx) |
| { |
| app->netz_ctx = netz_ctx; |
| |
| return 0; |
| } |
| |
| int mqtt_connect(struct mqtt_app_ctx_t *app) |
| { |
| struct netz_ctx_t *netz_ctx; |
| struct app_buf_t *tx_buf; |
| struct app_buf_t *rx_buf; |
| int session; |
| int conn_ack; |
| int rc; |
| |
| netz_ctx = app->netz_ctx; |
| tx_buf = app->tx_buf; |
| rx_buf = app->rx_buf; |
| |
| nano_sem_take(&app->sem, TICKS_UNLIMITED); |
| |
| rc = mqtt_pack_connect(tx_buf, app->client); |
| if (rc != 0) { |
| nano_sem_give(&app->sem); |
| return -EINVAL; |
| } |
| |
| rc = netz_tcp(netz_ctx); |
| if (rc != 0) { |
| nano_sem_give(&app->sem); |
| return -EIO; |
| } |
| |
| rc = netz_tx(netz_ctx, tx_buf); |
| if (rc != 0) { |
| nano_sem_give(&app->sem); |
| return -EIO; |
| } |
| |
| rc = netz_rx(netz_ctx, rx_buf); |
| if (rc != 0) { |
| nano_sem_give(&app->sem); |
| return -EIO; |
| } |
| |
| rc = mqtt_unpack_connack(rx_buf, &session, &conn_ack); |
| if (rc != 0) { |
| nano_sem_give(&app->sem); |
| return -EINVAL; |
| } |
| |
| if (app->client->clean_session == 1) { |
| nano_sem_give(&app->sem); |
| if (session == 0 && conn_ack == 0) { |
| return 0; |
| } |
| return -EINVAL; |
| } |
| /* |
| * TODO: validate CleanSession = 0 |
| */ |
| nano_sem_give(&app->sem); |
| return 0; |
| } |
| |
| int mqtt_disconnect(struct mqtt_app_ctx_t *app) |
| { |
| int rc; |
| |
| nano_sem_take(&app->sem, TICKS_UNLIMITED); |
| |
| rc = mqtt_pack_disconnect(app->tx_buf); |
| if (rc != 0) { |
| nano_sem_give(&app->sem); |
| return -EINVAL; |
| } |
| |
| rc = netz_tx(app->netz_ctx, app->tx_buf); |
| nano_sem_give(&app->sem); |
| if (rc != 0) { |
| return -EIO; |
| } |
| |
| return 0; |
| } |
| |
| |
| static inline int mqtt_publish_qos1(struct mqtt_app_ctx_t *app, uint16_t id) |
| { |
| struct netz_ctx_t *netz_ctx; |
| struct app_buf_t *rx_buf; |
| uint8_t pkt_type; |
| uint8_t dup = 0; |
| uint16_t rcv_pkt_id = 0; |
| int rc; |
| |
| netz_ctx = app->netz_ctx; |
| rx_buf = app->rx_buf; |
| |
| rc = netz_rx(netz_ctx, rx_buf); |
| if (rc != 0) { |
| return -EIO; |
| } |
| |
| rc = mqtt_unpack_ack(rx_buf, &pkt_type, &dup, &rcv_pkt_id); |
| if (rc != 0) { |
| return -EINVAL; |
| } |
| |
| if (pkt_type != MQTT_PUBACK) { |
| return -EINVAL; |
| } |
| |
| if (rcv_pkt_id != id) { |
| return -EINVAL; |
| } |
| |
| return 0; |
| } |
| |
| static inline int mqtt_publish_qos2(struct mqtt_app_ctx_t *app, |
| const uint16_t pkt_id) |
| { |
| struct netz_ctx_t *netz_ctx; |
| struct app_buf_t *tx_buf; |
| struct app_buf_t *rx_buf; |
| uint8_t pkt_type; |
| uint8_t dup = 0; |
| uint16_t rcv_pkt_id; |
| int rc; |
| |
| netz_ctx = app->netz_ctx; |
| tx_buf = app->tx_buf; |
| rx_buf = app->rx_buf; |
| |
| rc = netz_rx(netz_ctx, rx_buf); |
| if (rc != 0) { |
| return -EIO; |
| } |
| |
| rc = mqtt_unpack_ack(rx_buf, &pkt_type, &dup, &rcv_pkt_id); |
| if (rc != 0) { |
| return -EINVAL; |
| } |
| |
| if (pkt_type != MQTT_PUBREC) { |
| return -EINVAL; |
| } |
| if (rcv_pkt_id != pkt_id) { |
| return -EINVAL; |
| } |
| |
| rc = mqtt_pack_pubrel(tx_buf, dup, pkt_id); |
| if (rc != 0) { |
| return -EINVAL; |
| } |
| |
| rc = netz_tx(netz_ctx, tx_buf); |
| if (rc != 0) { |
| return -EIO; |
| } |
| |
| rc = netz_rx(netz_ctx, rx_buf); |
| if (rc != 0) { |
| return -EIO; |
| } |
| |
| rc = mqtt_unpack_ack(rx_buf, &pkt_type, &dup, &rcv_pkt_id); |
| if (rc != 0) { |
| return -EINVAL; |
| } |
| |
| if (pkt_type != MQTT_PUBCOMP) { |
| return -EINVAL; |
| } |
| if (rcv_pkt_id != pkt_id) { |
| return -EINVAL; |
| } |
| |
| return 0; |
| } |
| |
| int mqtt_publish(struct mqtt_app_ctx_t *app, struct mqtt_msg_t *msg, |
| enum mqtt_qos qos, int retained) |
| { |
| struct netz_ctx_t *netz_ctx; |
| struct app_buf_t *tx_buf; |
| int rc; |
| |
| netz_ctx = app->netz_ctx; |
| tx_buf = app->tx_buf; |
| |
| msg->dup = 0; |
| msg->qos = qos; |
| msg->retained = retained; |
| |
| nano_sem_take(&app->sem, TICKS_UNLIMITED); |
| |
| if (msg->qos != MQTT_QoS0) { |
| mqtt_next_pktid(app->client, &msg->pkt_id); |
| } else { |
| msg->pkt_id = 0; |
| } |
| |
| rc = mqtt_pack_publish(tx_buf, msg); |
| if (rc != 0) { |
| nano_sem_give(&app->sem); |
| return -EINVAL; |
| } |
| |
| rc = netz_tx(netz_ctx, tx_buf); |
| if (rc != 0) { |
| nano_sem_give(&app->sem); |
| return -EIO; |
| } |
| |
| switch (msg->qos) { |
| case MQTT_QoS0: |
| break; |
| |
| case MQTT_QoS1: |
| rc = mqtt_publish_qos1(app, msg->pkt_id); |
| if (rc != 0) { |
| nano_sem_give(&app->sem); |
| return -EINVAL; |
| } |
| break; |
| |
| case MQTT_QoS2: |
| rc = mqtt_publish_qos2(app, msg->pkt_id); |
| if (rc != 0) { |
| nano_sem_give(&app->sem); |
| return -EINVAL; |
| } |
| break; |
| |
| default: |
| nano_sem_give(&app->sem); |
| return -EINVAL; |
| } |
| |
| nano_sem_give(&app->sem); |
| return 0; |
| } |
| |
| int mqtt_pingreq(struct mqtt_app_ctx_t *app) |
| { |
| struct netz_ctx_t *netz_ctx; |
| struct app_buf_t *tx_buf; |
| struct app_buf_t *rx_buf; |
| uint8_t pkt_type; |
| uint8_t dup = 0; |
| uint16_t rcv_pkt_id; |
| int rc; |
| |
| netz_ctx = app->netz_ctx; |
| tx_buf = app->tx_buf; |
| rx_buf = app->rx_buf; |
| |
| nano_sem_take(&app->sem, TICKS_UNLIMITED); |
| |
| rc = mqtt_pack_pingreq(tx_buf); |
| if (rc != 0) { |
| nano_sem_give(&app->sem); |
| return -EINVAL; |
| } |
| |
| rc = netz_tx(netz_ctx, tx_buf); |
| if (rc != 0) { |
| nano_sem_give(&app->sem); |
| return -EIO; |
| } |
| |
| rc = netz_rx(netz_ctx, rx_buf); |
| if (rc != 0) { |
| nano_sem_give(&app->sem); |
| return -EIO; |
| } |
| |
| rc = mqtt_unpack_ack(rx_buf, &pkt_type, &dup, &rcv_pkt_id); |
| if (rc != 0) { |
| nano_sem_give(&app->sem); |
| return -EINVAL; |
| } |
| |
| if (pkt_type != MQTT_PINGRESP) { |
| nano_sem_give(&app->sem); |
| return -EINVAL; |
| } |
| |
| nano_sem_give(&app->sem); |
| return 0; |
| } |
| |
| int mqtt_subscribe(struct mqtt_app_ctx_t *app, |
| char *topic, enum mqtt_qos qos) |
| { |
| struct netz_ctx_t *netz_ctx; |
| struct app_buf_t *tx_buf; |
| struct app_buf_t *rx_buf; |
| uint16_t pkt_id; |
| uint16_t rcv_pkt_id; |
| int granted_qos; |
| int dup = 0; |
| int rc; |
| |
| netz_ctx = app->netz_ctx; |
| tx_buf = app->tx_buf; |
| rx_buf = app->rx_buf; |
| |
| nano_sem_take(&app->sem, TICKS_UNLIMITED); |
| |
| mqtt_next_pktid(app->client, &pkt_id); |
| |
| rc = mqtt_pack_subscribe(tx_buf, dup, pkt_id, topic, qos); |
| if (rc != 0) { |
| nano_sem_give(&app->sem); |
| return -EINVAL; |
| } |
| |
| rc = netz_tx(netz_ctx, tx_buf); |
| if (rc != 0) { |
| nano_sem_give(&app->sem); |
| return -EIO; |
| } |
| |
| rc = netz_rx(netz_ctx, rx_buf); |
| if (rc != 0) { |
| nano_sem_give(&app->sem); |
| return -EIO; |
| } |
| |
| rc = mqtt_unpack_suback(rx_buf, &rcv_pkt_id, &granted_qos); |
| if (rc != 0) { |
| nano_sem_give(&app->sem); |
| return -EINVAL; |
| } |
| |
| if (rcv_pkt_id != pkt_id) { |
| nano_sem_give(&app->sem); |
| return -EINVAL; |
| } |
| |
| nano_sem_give(&app->sem); |
| return granted_qos; |
| } |
| |
| int mqtt_unsubscribe(struct mqtt_app_ctx_t *app, char *topic) |
| { |
| struct netz_ctx_t *netz_ctx; |
| struct app_buf_t *tx_buf; |
| struct app_buf_t *rx_buf; |
| uint16_t pkt_id; |
| uint16_t rcv_pkt_id; |
| int dup = 0; |
| int rc; |
| |
| netz_ctx = app->netz_ctx; |
| tx_buf = app->tx_buf; |
| rx_buf = app->rx_buf; |
| |
| nano_sem_take(&app->sem, TICKS_UNLIMITED); |
| |
| mqtt_next_pktid(app->client, &pkt_id); |
| |
| rc = mqtt_pack_unsubscribe(tx_buf, dup, pkt_id, topic); |
| if (rc != 0) { |
| nano_sem_give(&app->sem); |
| return -EINVAL; |
| } |
| |
| rc = netz_tx(netz_ctx, tx_buf); |
| if (rc != 0) { |
| nano_sem_give(&app->sem); |
| return -EIO; |
| } |
| |
| rc = netz_rx(netz_ctx, rx_buf); |
| if (rc != 0) { |
| nano_sem_give(&app->sem); |
| return -EIO; |
| } |
| |
| rc = mqtt_unpack_unsuback(rx_buf, &rcv_pkt_id); |
| if (rc != 0) { |
| nano_sem_give(&app->sem); |
| return -EINVAL; |
| } |
| |
| if (rcv_pkt_id != pkt_id) { |
| nano_sem_give(&app->sem); |
| return -EINVAL; |
| } |
| |
| nano_sem_give(&app->sem); |
| return 0; |
| } |
| |
| int mqtt_handle_publish(struct mqtt_app_ctx_t *app) |
| { |
| struct mqtt_msg_t msg; |
| uint16_t rcv_pkt_id; |
| int8_t rcv_pkt_type; |
| uint8_t dup; |
| int rc; |
| |
| rc = mqtt_unpack_publish(app->rx_buf, &msg); |
| if (rc != 0) { |
| return -EINVAL; |
| } |
| |
| switch (msg.qos) { |
| case MQTT_QoS0: |
| break; |
| |
| case MQTT_QoS1: |
| rc = mqtt_pack_msg(app->tx_buf, MQTT_PUBACK, msg.pkt_id, 0); |
| if (rc != 0) { |
| return -EINVAL; |
| } |
| |
| rc = netz_tx(app->netz_ctx, app->tx_buf); |
| if (rc != 0) { |
| return -EIO; |
| } |
| |
| break; |
| |
| case MQTT_QoS2: |
| rc = mqtt_pack_msg(app->tx_buf, MQTT_PUBREC, msg.pkt_id, 0); |
| if (rc != 0) { |
| return -EINVAL; |
| } |
| |
| rc = netz_tx(app->netz_ctx, app->tx_buf); |
| if (rc != 0) { |
| return -EIO; |
| } |
| |
| rc = netz_rx(app->netz_ctx, app->rx_buf); |
| if (rc != 0) { |
| return -EIO; |
| } |
| |
| rc = mqtt_unpack_ack(app->rx_buf, &rcv_pkt_type, &dup, |
| &rcv_pkt_id); |
| if (rc != 0) { |
| return -EINVAL; |
| } |
| |
| if (rcv_pkt_type != MQTT_PUBREL) { |
| return -EINVAL; |
| } |
| |
| if (msg.pkt_id != rcv_pkt_id) { |
| return -EINVAL; |
| } |
| |
| rc = mqtt_pack_msg(app->tx_buf, MQTT_PUBCOMP, msg.pkt_id, 0); |
| if (rc != 0) { |
| return -EINVAL; |
| } |
| |
| rc = netz_tx(app->netz_ctx, app->tx_buf); |
| if (rc != 0) { |
| return -EIO; |
| } |
| |
| break; |
| |
| default: |
| return -EINVAL; |
| } |
| |
| if (app->cb_publish != NULL) { |
| return app->cb_publish(&msg); |
| } |
| |
| return 0; |
| } |
| |
| int mqtt_handle_pingreq(struct mqtt_app_ctx_t *app) |
| { |
| struct netz_ctx_t *netz_ctx; |
| struct app_buf_t *tx_buf; |
| int rc; |
| |
| netz_ctx = app->netz_ctx; |
| tx_buf = app->tx_buf; |
| |
| nano_sem_take(&app->sem, TICKS_UNLIMITED); |
| |
| rc = mqtt_pack_pingresp(tx_buf); |
| if (rc != 0) { |
| return -EINVAL; |
| } |
| |
| rc = netz_tx(netz_ctx, tx_buf); |
| if (rc != 0) { |
| nano_sem_give(&app->sem); |
| return -EIO; |
| } |
| |
| nano_sem_give(&app->sem); |
| return 0; |
| } |
| |
| int mqtt_read(struct mqtt_app_ctx_t *app) |
| { |
| struct netz_ctx_t *netz_ctx; |
| struct app_buf_t *rx_buf; |
| int pkt_type; |
| int rc; |
| |
| nano_sem_take(&app->sem, TICKS_UNLIMITED); |
| |
| netz_ctx = app->netz_ctx; |
| rx_buf = app->rx_buf; |
| |
| |
| rc = netz_rx(netz_ctx, rx_buf); |
| if (rc != 0) { |
| nano_sem_give(&app->sem); |
| return -EIO; |
| } |
| |
| if (rx_buf->length < 2) { |
| nano_sem_give(&app->sem); |
| return -ENOMEM; |
| } |
| |
| pkt_type = (rx_buf->buf[0] & 0xF0) >> 4; |
| |
| rc = -EINVAL; |
| |
| /* This switch-case will be used for packet-handling routines |
| * that will be coded in future releases. |
| */ |
| switch (pkt_type) { |
| case MQTT_PUBLISH: |
| rc = mqtt_handle_publish(app); |
| break; |
| case MQTT_CONNECT: |
| case MQTT_CONNACK: |
| case MQTT_PUBACK: |
| case MQTT_PUBREC: |
| case MQTT_PUBREL: |
| case MQTT_PUBCOMP: |
| case MQTT_SUBSCRIBE: |
| case MQTT_SUBACK: |
| case MQTT_UNSUBSCRIBE: |
| case MQTT_UNSUBACK: |
| break; |
| case MQTT_PINGREQ: |
| rc = mqtt_handle_pingreq(app); |
| break; |
| case MQTT_PINGRESP: |
| break; |
| case MQTT_DISCONNECT: |
| break; |
| } |
| |
| nano_sem_give(&app->sem); |
| return rc; |
| } |