blob: c267b474881eeba7161cc9526a5758900912e6eb [file] [log] [blame]
/*
* Copyright (c) 2016 Intel Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "mqtt.h"
#include <errno.h>
int mqtt_cb_publish(struct mqtt_app_ctx_t *app,
int (*cb_publish)(struct mqtt_msg_t *msg))
{
app->cb_publish = cb_publish;
return 0;
}
int mqtt_init(struct mqtt_app_ctx_t *app)
{
nano_sem_init(&app->sem);
nano_sem_give(&app->sem);
return 0;
}
int mqtt_configure(struct mqtt_app_ctx_t *app, const char *client_id,
int clean_session, enum mqtt_protocol proto)
{
app->client->client_id = client_id;
app->client->clean_session = clean_session;
app->client->proto = proto;
return 0;
}
int mqtt_auth(struct mqtt_app_ctx_t *app, const char *username,
const char *pass)
{
app->client->username = username;
app->client->pass = pass;
return 0;
}
int mqtt_will(struct mqtt_app_ctx_t *app, const char *topic,
const char *msg, enum mqtt_qos qos, int retained)
{
app->client->will_enabled = 1;
app->client->will_topic = topic;
app->client->will_payload = msg;
app->client->will_qos = qos;
app->client->will_retained = retained;
return 0;
}
int mqtt_buffers(struct mqtt_app_ctx_t *app, struct app_buf_t *tx_buf,
struct app_buf_t *rx_buf)
{
app->tx_buf = tx_buf;
app->rx_buf = rx_buf;
return 0;
}
int mqtt_network(struct mqtt_app_ctx_t *app, struct netz_ctx_t *netz_ctx)
{
app->netz_ctx = netz_ctx;
return 0;
}
int mqtt_connect(struct mqtt_app_ctx_t *app)
{
struct netz_ctx_t *netz_ctx;
struct app_buf_t *tx_buf;
struct app_buf_t *rx_buf;
int session;
int conn_ack;
int rc;
netz_ctx = app->netz_ctx;
tx_buf = app->tx_buf;
rx_buf = app->rx_buf;
nano_sem_take(&app->sem, TICKS_UNLIMITED);
rc = mqtt_pack_connect(tx_buf, app->client);
if (rc != 0) {
nano_sem_give(&app->sem);
return -EINVAL;
}
rc = netz_tcp(netz_ctx);
if (rc != 0) {
nano_sem_give(&app->sem);
return -EIO;
}
rc = netz_tx(netz_ctx, tx_buf);
if (rc != 0) {
nano_sem_give(&app->sem);
return -EIO;
}
rc = netz_rx(netz_ctx, rx_buf);
if (rc != 0) {
nano_sem_give(&app->sem);
return -EIO;
}
rc = mqtt_unpack_connack(rx_buf, &session, &conn_ack);
if (rc != 0) {
nano_sem_give(&app->sem);
return -EINVAL;
}
if (app->client->clean_session == 1) {
nano_sem_give(&app->sem);
if (session == 0 && conn_ack == 0) {
return 0;
}
return -EINVAL;
}
/*
* TODO: validate CleanSession = 0
*/
nano_sem_give(&app->sem);
return 0;
}
int mqtt_disconnect(struct mqtt_app_ctx_t *app)
{
int rc;
nano_sem_take(&app->sem, TICKS_UNLIMITED);
rc = mqtt_pack_disconnect(app->tx_buf);
if (rc != 0) {
nano_sem_give(&app->sem);
return -EINVAL;
}
rc = netz_tx(app->netz_ctx, app->tx_buf);
nano_sem_give(&app->sem);
if (rc != 0) {
return -EIO;
}
return 0;
}
static inline int mqtt_publish_qos1(struct mqtt_app_ctx_t *app, uint16_t id)
{
struct netz_ctx_t *netz_ctx;
struct app_buf_t *rx_buf;
uint8_t pkt_type;
uint8_t dup = 0;
uint16_t rcv_pkt_id = 0;
int rc;
netz_ctx = app->netz_ctx;
rx_buf = app->rx_buf;
rc = netz_rx(netz_ctx, rx_buf);
if (rc != 0) {
return -EIO;
}
rc = mqtt_unpack_ack(rx_buf, &pkt_type, &dup, &rcv_pkt_id);
if (rc != 0) {
return -EINVAL;
}
if (pkt_type != MQTT_PUBACK) {
return -EINVAL;
}
if (rcv_pkt_id != id) {
return -EINVAL;
}
return 0;
}
static inline int mqtt_publish_qos2(struct mqtt_app_ctx_t *app,
const uint16_t pkt_id)
{
struct netz_ctx_t *netz_ctx;
struct app_buf_t *tx_buf;
struct app_buf_t *rx_buf;
uint8_t pkt_type;
uint8_t dup = 0;
uint16_t rcv_pkt_id;
int rc;
netz_ctx = app->netz_ctx;
tx_buf = app->tx_buf;
rx_buf = app->rx_buf;
rc = netz_rx(netz_ctx, rx_buf);
if (rc != 0) {
return -EIO;
}
rc = mqtt_unpack_ack(rx_buf, &pkt_type, &dup, &rcv_pkt_id);
if (rc != 0) {
return -EINVAL;
}
if (pkt_type != MQTT_PUBREC) {
return -EINVAL;
}
if (rcv_pkt_id != pkt_id) {
return -EINVAL;
}
rc = mqtt_pack_pubrel(tx_buf, dup, pkt_id);
if (rc != 0) {
return -EINVAL;
}
rc = netz_tx(netz_ctx, tx_buf);
if (rc != 0) {
return -EIO;
}
rc = netz_rx(netz_ctx, rx_buf);
if (rc != 0) {
return -EIO;
}
rc = mqtt_unpack_ack(rx_buf, &pkt_type, &dup, &rcv_pkt_id);
if (rc != 0) {
return -EINVAL;
}
if (pkt_type != MQTT_PUBCOMP) {
return -EINVAL;
}
if (rcv_pkt_id != pkt_id) {
return -EINVAL;
}
return 0;
}
int mqtt_publish(struct mqtt_app_ctx_t *app, struct mqtt_msg_t *msg,
enum mqtt_qos qos, int retained)
{
struct netz_ctx_t *netz_ctx;
struct app_buf_t *tx_buf;
int rc;
netz_ctx = app->netz_ctx;
tx_buf = app->tx_buf;
msg->dup = 0;
msg->qos = qos;
msg->retained = retained;
nano_sem_take(&app->sem, TICKS_UNLIMITED);
if (msg->qos != MQTT_QoS0) {
mqtt_next_pktid(app->client, &msg->pkt_id);
} else {
msg->pkt_id = 0;
}
rc = mqtt_pack_publish(tx_buf, msg);
if (rc != 0) {
nano_sem_give(&app->sem);
return -EINVAL;
}
rc = netz_tx(netz_ctx, tx_buf);
if (rc != 0) {
nano_sem_give(&app->sem);
return -EIO;
}
switch (msg->qos) {
case MQTT_QoS0:
break;
case MQTT_QoS1:
rc = mqtt_publish_qos1(app, msg->pkt_id);
if (rc != 0) {
nano_sem_give(&app->sem);
return -EINVAL;
}
break;
case MQTT_QoS2:
rc = mqtt_publish_qos2(app, msg->pkt_id);
if (rc != 0) {
nano_sem_give(&app->sem);
return -EINVAL;
}
break;
default:
nano_sem_give(&app->sem);
return -EINVAL;
}
nano_sem_give(&app->sem);
return 0;
}
int mqtt_pingreq(struct mqtt_app_ctx_t *app)
{
struct netz_ctx_t *netz_ctx;
struct app_buf_t *tx_buf;
struct app_buf_t *rx_buf;
uint8_t pkt_type;
uint8_t dup = 0;
uint16_t rcv_pkt_id;
int rc;
netz_ctx = app->netz_ctx;
tx_buf = app->tx_buf;
rx_buf = app->rx_buf;
nano_sem_take(&app->sem, TICKS_UNLIMITED);
rc = mqtt_pack_pingreq(tx_buf);
if (rc != 0) {
nano_sem_give(&app->sem);
return -EINVAL;
}
rc = netz_tx(netz_ctx, tx_buf);
if (rc != 0) {
nano_sem_give(&app->sem);
return -EIO;
}
rc = netz_rx(netz_ctx, rx_buf);
if (rc != 0) {
nano_sem_give(&app->sem);
return -EIO;
}
rc = mqtt_unpack_ack(rx_buf, &pkt_type, &dup, &rcv_pkt_id);
if (rc != 0) {
nano_sem_give(&app->sem);
return -EINVAL;
}
if (pkt_type != MQTT_PINGRESP) {
nano_sem_give(&app->sem);
return -EINVAL;
}
nano_sem_give(&app->sem);
return 0;
}
int mqtt_subscribe(struct mqtt_app_ctx_t *app,
char *topic, enum mqtt_qos qos)
{
struct netz_ctx_t *netz_ctx;
struct app_buf_t *tx_buf;
struct app_buf_t *rx_buf;
uint16_t pkt_id;
uint16_t rcv_pkt_id;
int granted_qos;
int dup = 0;
int rc;
netz_ctx = app->netz_ctx;
tx_buf = app->tx_buf;
rx_buf = app->rx_buf;
nano_sem_take(&app->sem, TICKS_UNLIMITED);
mqtt_next_pktid(app->client, &pkt_id);
rc = mqtt_pack_subscribe(tx_buf, dup, pkt_id, topic, qos);
if (rc != 0) {
nano_sem_give(&app->sem);
return -EINVAL;
}
rc = netz_tx(netz_ctx, tx_buf);
if (rc != 0) {
nano_sem_give(&app->sem);
return -EIO;
}
rc = netz_rx(netz_ctx, rx_buf);
if (rc != 0) {
nano_sem_give(&app->sem);
return -EIO;
}
rc = mqtt_unpack_suback(rx_buf, &rcv_pkt_id, &granted_qos);
if (rc != 0) {
nano_sem_give(&app->sem);
return -EINVAL;
}
if (rcv_pkt_id != pkt_id) {
nano_sem_give(&app->sem);
return -EINVAL;
}
nano_sem_give(&app->sem);
return granted_qos;
}
int mqtt_unsubscribe(struct mqtt_app_ctx_t *app, char *topic)
{
struct netz_ctx_t *netz_ctx;
struct app_buf_t *tx_buf;
struct app_buf_t *rx_buf;
uint16_t pkt_id;
uint16_t rcv_pkt_id;
int dup = 0;
int rc;
netz_ctx = app->netz_ctx;
tx_buf = app->tx_buf;
rx_buf = app->rx_buf;
nano_sem_take(&app->sem, TICKS_UNLIMITED);
mqtt_next_pktid(app->client, &pkt_id);
rc = mqtt_pack_unsubscribe(tx_buf, dup, pkt_id, topic);
if (rc != 0) {
nano_sem_give(&app->sem);
return -EINVAL;
}
rc = netz_tx(netz_ctx, tx_buf);
if (rc != 0) {
nano_sem_give(&app->sem);
return -EIO;
}
rc = netz_rx(netz_ctx, rx_buf);
if (rc != 0) {
nano_sem_give(&app->sem);
return -EIO;
}
rc = mqtt_unpack_unsuback(rx_buf, &rcv_pkt_id);
if (rc != 0) {
nano_sem_give(&app->sem);
return -EINVAL;
}
if (rcv_pkt_id != pkt_id) {
nano_sem_give(&app->sem);
return -EINVAL;
}
nano_sem_give(&app->sem);
return 0;
}
int mqtt_handle_publish(struct mqtt_app_ctx_t *app)
{
struct mqtt_msg_t msg;
uint16_t rcv_pkt_id;
int8_t rcv_pkt_type;
uint8_t dup;
int rc;
rc = mqtt_unpack_publish(app->rx_buf, &msg);
if (rc != 0) {
return -EINVAL;
}
switch (msg.qos) {
case MQTT_QoS0:
break;
case MQTT_QoS1:
rc = mqtt_pack_msg(app->tx_buf, MQTT_PUBACK, msg.pkt_id, 0);
if (rc != 0) {
return -EINVAL;
}
rc = netz_tx(app->netz_ctx, app->tx_buf);
if (rc != 0) {
return -EIO;
}
break;
case MQTT_QoS2:
rc = mqtt_pack_msg(app->tx_buf, MQTT_PUBREC, msg.pkt_id, 0);
if (rc != 0) {
return -EINVAL;
}
rc = netz_tx(app->netz_ctx, app->tx_buf);
if (rc != 0) {
return -EIO;
}
rc = netz_rx(app->netz_ctx, app->rx_buf);
if (rc != 0) {
return -EIO;
}
rc = mqtt_unpack_ack(app->rx_buf, &rcv_pkt_type, &dup,
&rcv_pkt_id);
if (rc != 0) {
return -EINVAL;
}
if (rcv_pkt_type != MQTT_PUBREL) {
return -EINVAL;
}
if (msg.pkt_id != rcv_pkt_id) {
return -EINVAL;
}
rc = mqtt_pack_msg(app->tx_buf, MQTT_PUBCOMP, msg.pkt_id, 0);
if (rc != 0) {
return -EINVAL;
}
rc = netz_tx(app->netz_ctx, app->tx_buf);
if (rc != 0) {
return -EIO;
}
break;
default:
return -EINVAL;
}
if (app->cb_publish != NULL) {
return app->cb_publish(&msg);
}
return 0;
}
int mqtt_handle_pingreq(struct mqtt_app_ctx_t *app)
{
struct netz_ctx_t *netz_ctx;
struct app_buf_t *tx_buf;
int rc;
netz_ctx = app->netz_ctx;
tx_buf = app->tx_buf;
nano_sem_take(&app->sem, TICKS_UNLIMITED);
rc = mqtt_pack_pingresp(tx_buf);
if (rc != 0) {
return -EINVAL;
}
rc = netz_tx(netz_ctx, tx_buf);
if (rc != 0) {
nano_sem_give(&app->sem);
return -EIO;
}
nano_sem_give(&app->sem);
return 0;
}
int mqtt_read(struct mqtt_app_ctx_t *app)
{
struct netz_ctx_t *netz_ctx;
struct app_buf_t *rx_buf;
int pkt_type;
int rc;
nano_sem_take(&app->sem, TICKS_UNLIMITED);
netz_ctx = app->netz_ctx;
rx_buf = app->rx_buf;
rc = netz_rx(netz_ctx, rx_buf);
if (rc != 0) {
nano_sem_give(&app->sem);
return -EIO;
}
if (rx_buf->length < 2) {
nano_sem_give(&app->sem);
return -ENOMEM;
}
pkt_type = (rx_buf->buf[0] & 0xF0) >> 4;
rc = -EINVAL;
/* This switch-case will be used for packet-handling routines
* that will be coded in future releases.
*/
switch (pkt_type) {
case MQTT_PUBLISH:
rc = mqtt_handle_publish(app);
break;
case MQTT_CONNECT:
case MQTT_CONNACK:
case MQTT_PUBACK:
case MQTT_PUBREC:
case MQTT_PUBREL:
case MQTT_PUBCOMP:
case MQTT_SUBSCRIBE:
case MQTT_SUBACK:
case MQTT_UNSUBSCRIBE:
case MQTT_UNSUBACK:
break;
case MQTT_PINGREQ:
rc = mqtt_handle_pingreq(app);
break;
case MQTT_PINGRESP:
break;
case MQTT_DISCONNECT:
break;
}
nano_sem_give(&app->sem);
return rc;
}