blob: bbfb698316bbc45000e5011ef4573f1335f5d0ec [file] [log] [blame]
/*
* Copyright (c) 2016 Intel Corporation
*
* SPDX-License-Identifier: Apache-2.0
*/
#include <net/mqtt.h>
#include "mqtt_pkt.h"
#include <net/net_ip.h>
#include <net/net_pkt.h>
#include <net/buf.h>
#include <errno.h>
#define MSG_SIZE CONFIG_MQTT_MSG_MAX_SIZE
#define MQTT_BUF_CTR (1 + CONFIG_MQTT_ADDITIONAL_BUFFER_CTR)
/* Memory pool internally used to handle messages that may exceed the size of
* system defined network buffer. By using this memory pool, routines don't deal
* with fragmentation, so algorithms are more easy to implement.
*/
NET_BUF_POOL_DEFINE(mqtt_msg_pool, MQTT_BUF_CTR, MSG_SIZE, 0, NULL);
#define MQTT_PUBLISHER_MIN_MSG_SIZE 2
int mqtt_tx_connect(struct mqtt_ctx *ctx, struct mqtt_connect_msg *msg)
{
struct net_buf *data = NULL;
struct net_pkt *tx = NULL;
int rc;
data = net_buf_alloc(&mqtt_msg_pool, ctx->net_timeout);
if (data == NULL) {
return -ENOMEM;
}
ctx->clean_session = msg->clean_session ? 1 : 0;
rc = mqtt_pack_connect(data->data, &data->len, MSG_SIZE, msg);
if (rc != 0) {
rc = -EINVAL;
goto exit_connect;
}
tx = net_pkt_get_tx(ctx->net_ctx, ctx->net_timeout);
if (tx == NULL) {
rc = -ENOMEM;
goto exit_connect;
}
net_pkt_frag_add(tx, data);
data = NULL;
rc = net_context_send(tx, NULL, ctx->net_timeout, NULL, NULL);
if (rc < 0) {
net_pkt_unref(tx);
}
tx = NULL;
return rc;
exit_connect:
net_pkt_frag_unref(data);
return rc;
}
int mqtt_tx_disconnect(struct mqtt_ctx *ctx)
{
struct net_pkt *tx = NULL;
/* DISCONNECT is a zero length message: 2 bytes required, no payload */
u8_t msg[2];
u16_t len;
int rc;
rc = mqtt_pack_disconnect(msg, &len, sizeof(msg));
if (rc != 0) {
return -EINVAL;
}
tx = net_pkt_get_tx(ctx->net_ctx, ctx->net_timeout);
if (tx == NULL) {
return -ENOMEM;
}
rc = net_pkt_append_all(tx, len, msg, ctx->net_timeout);
if (rc != true) {
rc = -ENOMEM;
goto exit_disconnect;
}
rc = net_context_send(tx, NULL, ctx->net_timeout, NULL, NULL);
if (rc < 0) {
goto exit_disconnect;
}
ctx->connected = 0;
tx = NULL;
if (ctx->disconnect) {
ctx->disconnect(ctx);
}
return rc;
exit_disconnect:
net_pkt_unref(tx);
return rc;
}
/**
* Writes the MQTT PUBxxx msg indicated by pkt_type with identifier 'id'
*
* @param [in] ctx MQTT context
* @param [in] id MQTT packet identifier
* @param [in] pkt_type MQTT packet type
*
* @retval 0 on success
* @retval -EINVAL
* @retval -ENOMEM if a tx pktfer is not available
* @retval -EIO on network error
*/
static
int mqtt_tx_pub_msgs(struct mqtt_ctx *ctx, u16_t id,
enum mqtt_packet pkt_type)
{
struct net_pkt *tx = NULL;
u8_t msg[4];
u16_t len;
int rc;
switch (pkt_type) {
case MQTT_PUBACK:
rc = mqtt_pack_puback(msg, &len, sizeof(msg), id);
break;
case MQTT_PUBCOMP:
rc = mqtt_pack_pubcomp(msg, &len, sizeof(msg), id);
break;
case MQTT_PUBREC:
rc = mqtt_pack_pubrec(msg, &len, sizeof(msg), id);
break;
case MQTT_PUBREL:
rc = mqtt_pack_pubrel(msg, &len, sizeof(msg), id);
break;
default:
return -EINVAL;
}
if (rc != 0) {
return -EINVAL;
}
tx = net_pkt_get_tx(ctx->net_ctx, ctx->net_timeout);
if (tx == NULL) {
return -ENOMEM;
}
rc = net_pkt_append_all(tx, len, msg, ctx->net_timeout);
if (rc != true) {
rc = -ENOMEM;
goto exit_send;
}
rc = net_context_send(tx, NULL, ctx->net_timeout, NULL, NULL);
if (rc < 0) {
goto exit_send;
}
tx = NULL;
return rc;
exit_send:
net_pkt_unref(tx);
return rc;
}
int mqtt_tx_puback(struct mqtt_ctx *ctx, u16_t id)
{
return mqtt_tx_pub_msgs(ctx, id, MQTT_PUBACK);
}
int mqtt_tx_pubcomp(struct mqtt_ctx *ctx, u16_t id)
{
return mqtt_tx_pub_msgs(ctx, id, MQTT_PUBCOMP);
}
int mqtt_tx_pubrec(struct mqtt_ctx *ctx, u16_t id)
{
return mqtt_tx_pub_msgs(ctx, id, MQTT_PUBREC);
}
int mqtt_tx_pubrel(struct mqtt_ctx *ctx, u16_t id)
{
return mqtt_tx_pub_msgs(ctx, id, MQTT_PUBREL);
}
int mqtt_tx_publish(struct mqtt_ctx *ctx, struct mqtt_publish_msg *msg)
{
struct net_buf *data = NULL;
struct net_pkt *tx = NULL;
int rc;
data = net_buf_alloc(&mqtt_msg_pool, ctx->net_timeout);
if (data == NULL) {
return -ENOMEM;
}
rc = mqtt_pack_publish(data->data, &data->len, data->size, msg);
if (rc != 0) {
rc = -EINVAL;
goto exit_publish;
}
tx = net_pkt_get_tx(ctx->net_ctx, ctx->net_timeout);
if (tx == NULL) {
rc = -ENOMEM;
goto exit_publish;
}
net_pkt_frag_add(tx, data);
data = NULL;
rc = net_context_send(tx, NULL, ctx->net_timeout, NULL, NULL);
if (rc < 0) {
net_pkt_unref(tx);
}
tx = NULL;
return rc;
exit_publish:
net_pkt_frag_unref(data);
return rc;
}
int mqtt_tx_pingreq(struct mqtt_ctx *ctx)
{
struct net_pkt *tx = NULL;
u8_t msg[2];
u16_t len;
int rc;
rc = mqtt_pack_pingreq(msg, &len, sizeof(msg));
if (rc != 0) {
return -EINVAL;
}
tx = net_pkt_get_tx(ctx->net_ctx, ctx->net_timeout);
if (tx == NULL) {
return -ENOMEM;
}
rc = net_pkt_append_all(tx, len, msg, ctx->net_timeout);
if (rc != true) {
rc = -ENOMEM;
goto exit_pingreq;
}
rc = net_context_send(tx, NULL, ctx->net_timeout, NULL, NULL);
if (rc < 0) {
goto exit_pingreq;
}
tx = NULL;
return rc;
exit_pingreq:
net_pkt_unref(tx);
return rc;
}
int mqtt_tx_subscribe(struct mqtt_ctx *ctx, u16_t pkt_id, u8_t items,
const char *topics[], const enum mqtt_qos qos[])
{
struct net_buf *data = NULL;
struct net_pkt *tx = NULL;
int rc;
data = net_buf_alloc(&mqtt_msg_pool, ctx->net_timeout);
if (data == NULL) {
return -ENOMEM;
}
rc = mqtt_pack_subscribe(data->data, &data->len, data->size,
pkt_id, items, topics, qos);
if (rc != 0) {
rc = -EINVAL;
goto exit_subs;
}
tx = net_pkt_get_tx(ctx->net_ctx, ctx->net_timeout);
if (tx == NULL) {
rc = -ENOMEM;
goto exit_subs;
}
net_pkt_frag_add(tx, data);
data = NULL;
rc = net_context_send(tx, NULL, ctx->net_timeout, NULL, NULL);
if (rc < 0) {
net_pkt_unref(tx);
}
tx = NULL;
return rc;
exit_subs:
net_pkt_frag_unref(data);
return rc;
}
int mqtt_tx_unsubscribe(struct mqtt_ctx *ctx, u16_t pkt_id, u8_t items,
const char *topics[])
{
struct net_buf *data = NULL;
struct net_pkt *tx = NULL;
int rc;
data = net_buf_alloc(&mqtt_msg_pool, ctx->net_timeout);
if (data == NULL) {
return -ENOMEM;
}
rc = mqtt_pack_unsubscribe(data->data, &data->len, data->size, pkt_id,
items, topics);
if (rc != 0) {
rc = -EINVAL;
goto exit_unsub;
}
tx = net_pkt_get_tx(ctx->net_ctx, ctx->net_timeout);
if (tx == NULL) {
rc = -ENOMEM;
goto exit_unsub;
}
net_pkt_frag_add(tx, data);
data = NULL;
rc = net_context_send(tx, NULL, ctx->net_timeout, NULL, NULL);
if (rc < 0) {
net_pkt_unref(tx);
}
tx = NULL;
return rc;
exit_unsub:
net_pkt_frag_unref(data);
return rc;
}
int mqtt_rx_connack(struct mqtt_ctx *ctx, struct net_buf *rx, int clean_session)
{
u16_t len;
u8_t connect_rc;
u8_t session;
u8_t *data;
int rc;
data = rx->data;
len = rx->len;
/* CONNACK is 4 bytes len */
rc = mqtt_unpack_connack(data, len, &session, &connect_rc);
if (rc != 0) {
rc = -EINVAL;
goto exit_connect;
}
switch (clean_session) {
/* new session */
case 1:
/* server acks there is no previous session
* and server connection return code is OK
*/
if (session == 0 && connect_rc == 0) {
rc = 0;
} else {
rc = -EINVAL;
goto exit_connect;
}
break;
/* previous session */
case 0:
/* TODO */
/* FALLTHROUGH */
default:
rc = -EINVAL;
goto exit_connect;
}
ctx->connected = 1;
if (ctx->connect) {
ctx->connect(ctx);
}
exit_connect:
return rc;
}
/**
* Parses and validates the MQTT PUBxxxx message contained in the rx packet.
*
*
* @details It validates against message structure and Packet Identifier.
* For the MQTT PUBREC and PUBREL messages, this function writes the
* corresponding MQTT PUB msg.
*
* @param ctx MQTT context
* @param rx RX packet
* @param type MQTT Packet type
*
* @retval 0 on success
* @retval -EINVAL on error
*/
static
int mqtt_rx_pub_msgs(struct mqtt_ctx *ctx, struct net_buf *rx,
enum mqtt_packet type)
{
int (*unpack)(u8_t *, u16_t, u16_t *) = NULL;
int (*response)(struct mqtt_ctx *, u16_t) = NULL;
u16_t pkt_id;
u16_t len;
u8_t *data;
int rc;
switch (type) {
case MQTT_PUBACK:
unpack = mqtt_unpack_puback;
break;
case MQTT_PUBCOMP:
unpack = mqtt_unpack_pubcomp;
break;
case MQTT_PUBREC:
unpack = mqtt_unpack_pubrec;
response = mqtt_tx_pubrel;
break;
case MQTT_PUBREL:
unpack = mqtt_unpack_pubrel;
response = mqtt_tx_pubcomp;
break;
default:
return -EINVAL;
}
data = rx->data;
len = rx->len;
/* 4 bytes message */
rc = unpack(data, len, &pkt_id);
if (rc != 0) {
return -EINVAL;
}
/* Only MQTT_APP_SUBSCRIBER, MQTT_APP_PUBLISHER_SUBSCRIBER and
* MQTT_APP_SERVER apps must receive the MQTT_PUBREL msg.
*/
if (type == MQTT_PUBREL) {
if (ctx->app_type != MQTT_APP_PUBLISHER) {
rc = ctx->publish_rx(ctx, NULL, pkt_id, MQTT_PUBREL);
} else {
rc = -EINVAL;
}
} else {
rc = ctx->publish_tx(ctx, pkt_id, type);
}
if (rc != 0) {
return -EINVAL;
}
if (!response) {
return 0;
}
rc = response(ctx, pkt_id);
if (rc != 0) {
return -EINVAL;
}
return 0;
}
int mqtt_rx_puback(struct mqtt_ctx *ctx, struct net_buf *rx)
{
return mqtt_rx_pub_msgs(ctx, rx, MQTT_PUBACK);
}
int mqtt_rx_pubcomp(struct mqtt_ctx *ctx, struct net_buf *rx)
{
return mqtt_rx_pub_msgs(ctx, rx, MQTT_PUBCOMP);
}
int mqtt_rx_pubrec(struct mqtt_ctx *ctx, struct net_buf *rx)
{
return mqtt_rx_pub_msgs(ctx, rx, MQTT_PUBREC);
}
int mqtt_rx_pubrel(struct mqtt_ctx *ctx, struct net_buf *rx)
{
return mqtt_rx_pub_msgs(ctx, rx, MQTT_PUBREL);
}
int mqtt_rx_pingresp(struct mqtt_ctx *ctx, struct net_buf *rx)
{
int rc;
ARG_UNUSED(ctx);
/* 2 bytes message */
rc = mqtt_unpack_pingresp(rx->data, rx->len);
if (rc != 0) {
return -EINVAL;
}
return 0;
}
int mqtt_rx_suback(struct mqtt_ctx *ctx, struct net_buf *rx)
{
enum mqtt_qos suback_qos[CONFIG_MQTT_SUBSCRIBE_MAX_TOPICS];
u16_t pkt_id;
u16_t len;
u8_t items;
u8_t *data;
int rc;
data = rx->data;
len = rx->len;
rc = mqtt_unpack_suback(data, len, &pkt_id, &items,
CONFIG_MQTT_SUBSCRIBE_MAX_TOPICS, suback_qos);
if (rc != 0) {
return -EINVAL;
}
if (!ctx->subscribe) {
return -EINVAL;
}
rc = ctx->subscribe(ctx, pkt_id, items, suback_qos);
if (rc != 0) {
return -EINVAL;
}
return 0;
}
int mqtt_rx_unsuback(struct mqtt_ctx *ctx, struct net_buf *rx)
{
u16_t pkt_id;
u16_t len;
u8_t *data;
int rc;
data = rx->data;
len = rx->len;
/* 4 bytes message */
rc = mqtt_unpack_unsuback(data, len, &pkt_id);
if (rc != 0) {
return -EINVAL;
}
if (!ctx->unsubscribe) {
return -EINVAL;
}
rc = ctx->unsubscribe(ctx, pkt_id);
if (rc != 0) {
return -EINVAL;
}
return 0;
}
int mqtt_rx_publish(struct mqtt_ctx *ctx, struct net_buf *rx)
{
struct mqtt_publish_msg msg;
int rc;
rc = mqtt_unpack_publish(rx->data, rx->len, &msg);
if (rc != 0) {
return -EINVAL;
}
rc = ctx->publish_rx(ctx, &msg, msg.pkt_id, MQTT_PUBLISH);
if (rc != 0) {
return -EINVAL;
}
switch (msg.qos) {
case MQTT_QoS2:
rc = mqtt_tx_pubrec(ctx, msg.pkt_id);
break;
case MQTT_QoS1:
rc = mqtt_tx_puback(ctx, msg.pkt_id);
break;
case MQTT_QoS0:
break;
default:
rc = -EINVAL;
}
return rc;
}
/**
* Linearizes an IP fragmented packet
*
* @param [in] ctx MQTT context structure
* @param [in] rx RX IP stack packet
* @param [in] min_size Min message size allowed. This allows us to exit if the
* rx packet is shorter than the expected msg size
*
* @retval Data buffer
* @retval NULL on error
*/
static
struct net_buf *mqtt_linearize_packet(struct mqtt_ctx *ctx, struct net_pkt *rx,
u16_t min_size)
{
struct net_buf *data = NULL;
u16_t data_len;
u16_t offset;
int rc;
/* CONFIG_MQTT_MSG_MAX_SIZE is defined via Kconfig. So here it's
* determined if the input packet could fit our data buffer or if
* it has the expected size.
*/
data_len = net_pkt_appdatalen(rx);
if (data_len < min_size || data_len > CONFIG_MQTT_MSG_MAX_SIZE) {
return NULL;
}
data = net_buf_alloc(&mqtt_msg_pool, ctx->net_timeout);
if (data == NULL) {
return NULL;
}
offset = net_pkt_get_len(rx) - data_len;
rc = net_frag_linear_copy(data, rx->frags, offset, data_len);
if (rc != 0) {
goto exit_error;
}
return data;
exit_error:
net_pkt_frag_unref(data);
return NULL;
}
/**
* Calls the appropriate rx routine for the MQTT message contained in rx
*
* @details On error, this routine will execute the 'ctx->malformed' callback
* (if defined)
*
* @param ctx MQTT context
* @param rx RX packet
*
* @retval 0 on success
* @retval -EINVAL if an unknown message is received
* @retval -ENOMEM if no data buffer is available
* @retval mqtt_rx_connack, mqtt_rx_pingresp, mqtt_rx_puback, mqtt_rx_pubcomp,
* mqtt_rx_publish, mqtt_rx_pubrec, mqtt_rx_pubrel and mqtt_rx_suback
* return codes
*/
static
int mqtt_parser(struct mqtt_ctx *ctx, struct net_pkt *rx)
{
u16_t pkt_type = MQTT_INVALID;
struct net_buf *data = NULL;
int rc = -EINVAL;
data = mqtt_linearize_packet(ctx, rx, MQTT_PUBLISHER_MIN_MSG_SIZE);
if (!data) {
return -ENOMEM;
}
pkt_type = MQTT_PACKET_TYPE(data->data[0]);
switch (pkt_type) {
case MQTT_CONNACK:
if (!ctx->connected) {
rc = mqtt_rx_connack(ctx, data, ctx->clean_session);
} else {
rc = -EINVAL;
}
break;
case MQTT_PUBACK:
rc = mqtt_rx_puback(ctx, data);
break;
case MQTT_PUBREC:
rc = mqtt_rx_pubrec(ctx, data);
break;
case MQTT_PUBCOMP:
rc = mqtt_rx_pubcomp(ctx, data);
break;
case MQTT_PINGRESP:
rc = mqtt_rx_pingresp(ctx, data);
break;
case MQTT_PUBLISH:
rc = mqtt_rx_publish(ctx, data);
break;
case MQTT_PUBREL:
rc = mqtt_rx_pubrel(ctx, data);
break;
case MQTT_SUBACK:
rc = mqtt_rx_suback(ctx, data);
break;
default:
rc = -EINVAL;
break;
}
if (rc != 0 && ctx->malformed) {
ctx->malformed(ctx, pkt_type);
}
net_pkt_frag_unref(data);
return rc;
}
static
void mqtt_recv(struct net_context *net_ctx, struct net_pkt *pkt, int status,
void *data)
{
struct mqtt_ctx *mqtt = (struct mqtt_ctx *)data;
/* net_ctx is already referenced to by the mqtt_ctx struct */
ARG_UNUSED(net_ctx);
if (status || !pkt) {
return;
}
if (net_pkt_appdatalen(pkt) == 0) {
goto lb_exit;
}
mqtt->rcv(mqtt, pkt);
lb_exit:
net_pkt_unref(pkt);
}
int mqtt_init(struct mqtt_ctx *ctx, enum mqtt_app app_type)
{
/* So far, only clean session = 1 is supported */
ctx->clean_session = 1;
ctx->connected = 0;
ctx->app_type = app_type;
ctx->rcv = mqtt_parser;
/* Install the receiver callback, timeout is set to K_NO_WAIT.
* In this case, no return code is evaluated.
*/
(void)net_context_recv(ctx->net_ctx, mqtt_recv, K_NO_WAIT, ctx);
return 0;
}