blob: cc9ce90f2630dc846b87a663f4f31079c90b5c54 [file] [log] [blame]
/*
* Copyright (c) 2017 Intel Corporation
*
* SPDX-License-Identifier: Apache-2.0
*/
#include <ztest.h>
#include <net/mqtt.h>
#include <net/net_context.h>
#include <net/net_pkt.h>
#include <misc/printk.h>
#include <string.h>
#include <errno.h>
#include "config.h"
/* Container for some structures used by the MQTT subscriber app. */
struct mqtt_client_ctx {
/**
* The connect message structure is only used during the connect
* stage. Developers must set some msg properties before calling the
* mqtt_tx_connect routine. See below.
*/
struct mqtt_connect_msg connect_msg;
/**
* This is the message that will be received by the server
* (MQTT broker).
*/
struct mqtt_publish_msg pub_msg;
/**
* This is the MQTT application context variable.
*/
struct mqtt_ctx mqtt_ctx;
/**
* This variable will be passed to the connect callback, declared inside
* the mqtt context struct. If not used, it could be set to NULL.
*/
void *connect_data;
/**
* This variable will be passed to the disconnect callback, declared
* inside the mqtt context struct. If not used, it could be set to NULL.
*/
void *disconnect_data;
/**
* This variable will be passed to the subscribe_tx callback, declared
* inside the mqtt context struct. If not used, it could be set to NULL.
*/
void *subscribe_data;
/**
* This variable will be passed to the unsubscribe_tx callback, declared
* inside the mqtt context struct. If not used, it could be set to NULL.
*/
void *unsubscribe_data;
};
/* The mqtt client struct */
static struct mqtt_client_ctx client_ctx;
/* The signature of this routine must match the connect callback declared at
* the mqtt.h header.
*/
static void connect_cb(struct mqtt_ctx *mqtt_ctx)
{
struct mqtt_client_ctx *client_ctx;
client_ctx = CONTAINER_OF(mqtt_ctx, struct mqtt_client_ctx, mqtt_ctx);
printk("[%s:%d]", __func__, __LINE__);
if (client_ctx->connect_data) {
printk(" user_data: %s",
(const char *)client_ctx->connect_data);
}
printk("\n");
}
/* The signature of this routine must match the disconnect callback declared at
* the mqtt.h header.
*/
static void disconnect_cb(struct mqtt_ctx *mqtt_ctx)
{
struct mqtt_client_ctx *client_ctx;
client_ctx = CONTAINER_OF(mqtt_ctx, struct mqtt_client_ctx, mqtt_ctx);
printk("[%s:%d]", __func__, __LINE__);
if (client_ctx->disconnect_data) {
printk(" user_data: %s",
(const char *)client_ctx->disconnect_data);
}
printk("\n");
}
/**
* The signature of this routine must match the publish_rx callback declared at
* the mqtt.h header.
*
* NOTE: we have two callbacks for MQTT Publish related stuff:
* - publish_tx, for publishers
* - publish_rx, for subscribers
*
* Applications must keep a "message database" with pkt_id's. So far, this is
* not implemented here. For example, if we receive a PUBREC message with an
* unknown pkt_id, this routine must return an error, for example -EINVAL or
* any negative value.
*/
static int publish_rx_cb(struct mqtt_ctx *mqtt_ctx, struct mqtt_publish_msg
*msg, u16_t pkt_id, enum mqtt_packet type)
{
struct mqtt_client_ctx *client_ctx;
const char *str;
int rc = 0;
client_ctx = CONTAINER_OF(mqtt_ctx, struct mqtt_client_ctx, mqtt_ctx);
switch (type) {
case MQTT_PUBLISH:
str = "MQTT_PUBLISH";
printk("[%s:%d] <%s> msg: %s", __func__, __LINE__,
str, msg->msg);
break;
case MQTT_PUBREL:
str = "MQTT_PUBREL";
printk("[%s:%d] <%s> packet id: %u", __func__, __LINE__,
str, pkt_id);
return 0;
default:
rc = -EINVAL;
str = "Invalid MQTT packet";
}
if (client_ctx->subscribe_data) {
printk(", user_data: %s",
(const char *)client_ctx->subscribe_data);
}
printk("\n");
return rc;
}
/**
* The signature of this routine must match the subscribe callback declared at
* the mqtt.h header.
*/
static int subscriber_cb(struct mqtt_ctx *mqtt_ctx, u16_t pkt_id,
u8_t items, enum mqtt_qos qos[])
{
struct mqtt_client_ctx *client_ctx;
client_ctx = CONTAINER_OF(mqtt_ctx, struct mqtt_client_ctx, mqtt_ctx);
printk("[%s:%d] items: %d packet id: %u", __func__, __LINE__,
items, pkt_id);
if (client_ctx->subscribe_data) {
printk(" user_data: %s",
(const char *)client_ctx->subscribe_data);
}
printk("\n");
return 0;
}
/**
* The signature of this routine must match the unsubscribe callback declared at
* the mqtt.h header.
*/
static int unsubscribe_cb(struct mqtt_ctx *mqtt_ctx, u16_t pkt_id)
{
struct mqtt_client_ctx *client_ctx;
client_ctx = CONTAINER_OF(mqtt_ctx, struct mqtt_client_ctx, mqtt_ctx);
printk("[%s:%d] packet id: %u", __func__, __LINE__, pkt_id);
if (client_ctx->unsubscribe_data) {
printk(" user_data: %s",
(const char *)client_ctx->unsubscribe_data);
}
printk("\n");
return 0;
}
/**
* The signature of this routine must match the malformed callback declared at
* the mqtt.h header.
*/
static void malformed_cb(struct mqtt_ctx *mqtt_ctx, u16_t pkt_type)
{
printk("[%s:%d] pkt_type: %u\n", __func__, __LINE__, pkt_type);
}
static char *get_mqtt_topic(void)
{
return "sensors";
}
#define RC_STR(rc) ((rc) == 0 ? "OK" : "ERROR")
#define PRINT_RESULT(func, rc) \
printk("[%s:%d] %s: %d <%s>\n", __func__, __LINE__, \
(func), rc, RC_STR(rc))
/* In this routine we block until the connected variable is 1 */
static int try_to_connect(struct mqtt_client_ctx *client_ctx)
{
int i = 0;
while (i++ < APP_CONNECT_TRIES && !client_ctx->mqtt_ctx.connected) {
int rc;
rc = mqtt_tx_connect(&client_ctx->mqtt_ctx,
&client_ctx->connect_msg);
k_sleep(APP_SLEEP_MSECS);
if (rc != 0) {
continue;
}
}
if (client_ctx->mqtt_ctx.connected) {
return TC_PASS;
}
return TC_FAIL;
}
static int init_network(void)
{
int rc;
/* Set everything to 0 and later just assign the required fields. */
memset(&client_ctx, 0x00, sizeof(client_ctx));
/* connect, disconnect and malformed may be set to NULL */
client_ctx.mqtt_ctx.connect = connect_cb;
client_ctx.mqtt_ctx.disconnect = disconnect_cb;
client_ctx.mqtt_ctx.malformed = malformed_cb;
client_ctx.mqtt_ctx.subscribe = subscriber_cb;
client_ctx.mqtt_ctx.unsubscribe = unsubscribe_cb;
client_ctx.mqtt_ctx.net_init_timeout = APP_NET_INIT_TIMEOUT;
client_ctx.mqtt_ctx.net_timeout = APP_TX_RX_TIMEOUT;
client_ctx.mqtt_ctx.peer_addr_str = SERVER_ADDR;
client_ctx.mqtt_ctx.peer_port = SERVER_PORT;
/* Publisher apps TX the MQTT PUBLISH msg */
client_ctx.mqtt_ctx.publish_rx = publish_rx_cb;
rc = mqtt_init(&client_ctx.mqtt_ctx, MQTT_APP_SUBSCRIBER);
if (rc != 0) {
goto exit_app;
}
/* The connect message will be sent to the MQTT server (broker).
* If clean_session here is 0, the mqtt_ctx clean_session variable
* will be set to 0 also. Please don't do that, set always to 1.
* Clean session = 0 is not yet supported.
*/
client_ctx.connect_msg.client_id = MQTT_CLIENTID;
client_ctx.connect_msg.client_id_len = strlen(MQTT_CLIENTID);
client_ctx.connect_msg.clean_session = 1;
client_ctx.connect_data = "CONNECTED";
client_ctx.disconnect_data = "DISCONNECTED";
client_ctx.subscribe_data = "SUBSCRIBE";
client_ctx.unsubscribe_data = "UNSUBSCRIBE";
rc = mqtt_init(&client_ctx.mqtt_ctx, MQTT_APP_SUBSCRIBER);
if (rc != 0) {
goto exit_app;
}
rc = mqtt_connect(&client_ctx.mqtt_ctx);
if (!rc) {
goto exit_app;
}
return TC_PASS;
exit_app:
mqtt_close(&client_ctx.mqtt_ctx);
return TC_FAIL;
}
static int test_connect(void)
{
int rc;
rc = try_to_connect(&client_ctx);
if (rc != 0) {
return TC_FAIL;
}
return TC_PASS;
}
static int test_subscribe(void)
{
int rc;
const char *topic_sub = get_mqtt_topic();
u16_t pkt_id_sub = sys_rand32_get();
static enum mqtt_qos mqtt_qos_sub[1];
rc = mqtt_tx_subscribe(&client_ctx.mqtt_ctx, pkt_id_sub, 1,
&topic_sub, mqtt_qos_sub);
k_sleep(APP_SLEEP_MSECS);
if (rc != 0) {
return TC_FAIL;
}
return TC_PASS;
}
static int test_unsubscribe(void)
{
int rc;
const char *topic_sub = get_mqtt_topic();
u16_t pkt_id_unsub = sys_rand32_get();
rc = mqtt_tx_unsubscribe(&client_ctx.mqtt_ctx, pkt_id_unsub,
1, &topic_sub);
k_sleep(APP_SLEEP_MSECS);
if (rc != 0) {
return TC_FAIL;
}
return TC_PASS;
}
static int test_disconnect(void)
{
int rc;
rc = mqtt_tx_disconnect(&client_ctx.mqtt_ctx);
if (rc != 0) {
return TC_FAIL;
}
return TC_PASS;
}
void test_mqtt_init(void)
{
zassert_true(init_network() == TC_PASS, NULL);
}
void test_mqtt_connect(void)
{
zassert_true(test_connect() == TC_PASS, NULL);
}
void test_mqtt_subscribe(void)
{
zassert_true(test_subscribe() == TC_PASS, NULL);
}
void test_mqtt_unsubscribe(void)
{
zassert_true(test_unsubscribe() == TC_PASS, NULL);
}
void test_mqtt_disconnect(void)
{
zassert_true(test_disconnect() == TC_PASS, NULL);
}