blob: bc79f04253ce3c427e70d4b27bbcd77005f9cdee [file] [log] [blame]
/* Protocol implementation. */
/*
* Copyright (c) 2018-2019 Linaro Ltd
*
* SPDX-License-Identifier: Apache-2.0
*/
#include <logging/log.h>
LOG_MODULE_DECLARE(net_google_iot_mqtt, LOG_LEVEL_DBG);
#include "protocol.h"
#include <zephyr.h>
#include <string.h>
#include <data/jwt.h>
#include <drivers/entropy.h>
#include <net/tls_credentials.h>
#include <net/mqtt.h>
#include <mbedtls/platform.h>
#include <mbedtls/ssl.h>
#include <mbedtls/entropy.h>
#include <mbedtls/ctr_drbg.h>
#include <mbedtls/debug.h>
extern int64_t time_base;
/* private key information */
extern unsigned char zepfull_private_der[];
extern unsigned int zepfull_private_der_len;
/*
* This is the hard-coded root certificate that we accept.
*/
#include "globalsign.inc"
static uint8_t client_id[] = CONFIG_CLOUD_CLIENT_ID;
static uint8_t client_username[] = "none";
static uint8_t pub_topic[] = CONFIG_CLOUD_PUBLISH_TOPIC;
static struct mqtt_publish_param pub_data;
static uint8_t token[512];
static bool connected;
static uint64_t next_alive;
/* The mqtt client struct */
static struct mqtt_client client_ctx;
/* MQTT Broker details. */
static struct sockaddr_storage broker;
/* Buffers for MQTT client. */
static uint8_t rx_buffer[1024];
static uint8_t tx_buffer[1024];
static sec_tag_t m_sec_tags[] = {
#if defined(MBEDTLS_X509_CRT_PARSE_C)
1,
#endif
#if defined(MBEDTLS_KEY_EXCHANGE_SOME_PSK_ENABLED)
APP_PSK_TAG,
#endif
};
/* Zephyr implementation of POSIX `time`. Has to be called k_time
* because time is already taken by newlib. The clock will be set by
* the SNTP client when it receives the time. We make no attempt to
* adjust it smoothly, and it should not be used for measuring
* intervals. Use `k_uptime_get()` directly for that. Also the
* time_t defined by newlib is a signed 32-bit value, and will
* overflow in 2037.
*/
time_t my_k_time(time_t *ptr)
{
int64_t stamp;
time_t now;
stamp = k_uptime_get();
now = (time_t)((stamp + time_base) / 1000);
if (ptr) {
*ptr = now;
}
return now;
}
void mqtt_subscribe_config(struct mqtt_client *const client)
{
#ifdef CONFIG_CLOUD_SUBSCRIBE_CONFIG
/* subscribe to config information */
struct mqtt_topic subs_topic = {
.topic = {
.utf8 = CONFIG_CLOUD_SUBSCRIBE_CONFIG,
.size = strlen(CONFIG_CLOUD_SUBSCRIBE_CONFIG)
},
.qos = MQTT_QOS_1_AT_LEAST_ONCE
};
const struct mqtt_subscription_list subs_list = {
.list = &subs_topic,
.list_count = 1U,
.message_id = 1U
};
int err;
err = mqtt_subscribe(client, &subs_list);
if (err) {
LOG_ERR("Failed to subscribe to %s item, error %d",
subs_topic.topic.utf8, err);
}
#endif
}
void mqtt_evt_handler(struct mqtt_client *const client,
const struct mqtt_evt *evt)
{
switch (evt->type) {
case MQTT_EVT_SUBACK:
LOG_INF("SUBACK packet id: %u", evt->param.suback.message_id);
break;
case MQTT_EVT_UNSUBACK:
LOG_INF("UNSUBACK packet id: %u",
evt->param.suback.message_id);
break;
case MQTT_EVT_CONNACK:
if (evt->result != 0) {
LOG_ERR("MQTT connect failed %d", evt->result);
break;
}
connected = true;
LOG_INF("MQTT client connected!");
mqtt_subscribe_config(client);
break;
case MQTT_EVT_DISCONNECT:
LOG_INF("MQTT client disconnected %d", evt->result);
connected = false;
break;
#ifdef CONFIG_CLOUD_SUBSCRIBE_CONFIG
case MQTT_EVT_PUBLISH: {
const struct mqtt_publish_param *pub = &evt->param.publish;
uint8_t d[33];
int len = pub->message.payload.len;
int bytes_read;
LOG_INF("MQTT publish received %d, %d bytes",
evt->result, len);
LOG_INF(" id: %d, qos: %d",
pub->message_id,
pub->message.topic.qos);
LOG_INF(" item: %s",
log_strdup(pub->message.topic.topic.utf8));
/* assuming the config message is textual */
while (len) {
bytes_read = mqtt_read_publish_payload_blocking(
client, d,
len >= 32 ? 32 : len);
if (bytes_read < 0) {
LOG_ERR("failure to read payload");
break;
}
d[bytes_read] = '\0';
LOG_INF(" payload: %s", log_strdup(d));
len -= bytes_read;
}
/* for MQTT_QOS_0_AT_MOST_ONCE no acknowledgment needed */
if (pub->message.topic.qos == MQTT_QOS_1_AT_LEAST_ONCE) {
struct mqtt_puback_param puback = {
.message_id = pub->message_id
};
mqtt_publish_qos1_ack(client, &puback);
}
break;
}
#endif
case MQTT_EVT_PUBACK:
if (evt->result != 0) {
LOG_ERR("MQTT PUBACK error %d", evt->result);
break;
}
/* increment message id for when we send next message */
pub_data.message_id += 1U;
LOG_INF("PUBACK packet id: %u",
evt->param.puback.message_id);
break;
default:
LOG_INF("MQTT event received %d", evt->type);
break;
}
}
static int wait_for_input(int timeout)
{
int res;
struct zsock_pollfd fds[1] = {
[0] = {.fd = client_ctx.transport.tls.sock,
.events = ZSOCK_POLLIN,
.revents = 0},
};
res = zsock_poll(fds, 1, timeout);
if (res < 0) {
LOG_ERR("poll read event error");
return -errno;
}
return res;
}
#define ALIVE_TIME (60 * MSEC_PER_SEC)
static struct mqtt_utf8 password = {
.utf8 = token
};
static struct mqtt_utf8 username = {
.utf8 = client_username,
.size = sizeof(client_username)
};
void mqtt_startup(char *hostname, int port)
{
int err, cnt;
char pub_msg[64];
struct sockaddr_in *broker4 = (struct sockaddr_in *)&broker;
struct mqtt_client *client = &client_ctx;
struct jwt_builder jb;
static struct zsock_addrinfo hints;
struct zsock_addrinfo *haddr;
int res = 0;
int retries = 5;
mbedtls_platform_set_time(my_k_time);
err = tls_credential_add(1, TLS_CREDENTIAL_CA_CERTIFICATE,
globalsign_certificate,
sizeof(globalsign_certificate));
if (err < 0) {
LOG_ERR("Failed to register public certificate: %d", err);
}
while (retries) {
hints.ai_family = AF_INET;
hints.ai_socktype = SOCK_STREAM;
hints.ai_protocol = 0;
cnt = 0;
while ((err = getaddrinfo("mqtt.googleapis.com", "8883", &hints,
&haddr)) && cnt < 3) {
LOG_ERR("Unable to get address for broker, retrying");
cnt++;
}
if (err != 0) {
LOG_ERR("Unable to get address for broker, error %d",
err);
return;
}
LOG_INF("DNS resolved for mqtt.googleapis.com:8883");
mqtt_client_init(client);
time_t now = my_k_time(NULL);
res = jwt_init_builder(&jb, token, sizeof(token));
if (res != 0) {
LOG_ERR("Error with JWT token");
return;
}
res = jwt_add_payload(&jb, now + 60 * 60, now,
CONFIG_CLOUD_AUDIENCE);
if (res != 0) {
LOG_ERR("Error with JWT token");
return;
}
res = jwt_sign(&jb, zepfull_private_der,
zepfull_private_der_len);
if (res != 0) {
LOG_ERR("Error with JWT token");
return;
}
broker4->sin_family = AF_INET;
broker4->sin_port = htons(port);
net_ipaddr_copy(&broker4->sin_addr,
&net_sin(haddr->ai_addr)->sin_addr);
/* MQTT client configuration */
client->broker = &broker;
client->evt_cb = mqtt_evt_handler;
client->client_id.utf8 = client_id;
client->client_id.size = strlen(client_id);
client->password = &password;
password.size = jwt_payload_len(&jb);
client->user_name = &username;
client->protocol_version = MQTT_VERSION_3_1_1;
/* MQTT buffers configuration */
client->rx_buf = rx_buffer;
client->rx_buf_size = sizeof(rx_buffer);
client->tx_buf = tx_buffer;
client->tx_buf_size = sizeof(tx_buffer);
/* MQTT transport configuration */
client->transport.type = MQTT_TRANSPORT_SECURE;
struct mqtt_sec_config *tls_config =
&client->transport.tls.config;
tls_config->peer_verify = TLS_PEER_VERIFY_REQUIRED;
tls_config->cipher_list = NULL;
tls_config->sec_tag_list = m_sec_tags;
tls_config->sec_tag_count = ARRAY_SIZE(m_sec_tags);
tls_config->hostname = hostname;
LOG_INF("Connecting to host: %s", hostname);
err = mqtt_connect(client);
if (err != 0) {
LOG_ERR("could not connect, error %d", err);
mqtt_disconnect(client);
retries--;
k_msleep(ALIVE_TIME);
continue;
}
if (wait_for_input(5 * MSEC_PER_SEC) > 0) {
mqtt_input(client);
if (!connected) {
LOG_ERR("failed to connect to mqtt_broker");
mqtt_disconnect(client);
retries--;
k_msleep(ALIVE_TIME);
continue;
} else {
break;
}
} else {
LOG_ERR("failed to connect to mqtt broker");
mqtt_disconnect(client);
retries--;
k_msleep(ALIVE_TIME);
continue;
}
}
if (!connected) {
LOG_ERR("Failed to connect to client, aborting");
return;
}
/* initialize publish structure */
pub_data.message.topic.topic.utf8 = pub_topic;
pub_data.message.topic.topic.size = strlen(pub_topic);
pub_data.message.topic.qos = MQTT_QOS_1_AT_LEAST_ONCE;
pub_data.message.payload.data = (uint8_t *)pub_msg;
pub_data.message_id = 1U;
pub_data.dup_flag = 0U;
pub_data.retain_flag = 1U;
mqtt_live(client);
next_alive = k_uptime_get() + ALIVE_TIME;
while (1) {
LOG_INF("Publishing data");
sprintf(pub_msg, "%s: %d\n", "payload", pub_data.message_id);
pub_data.message.payload.len = strlen(pub_msg);
err = mqtt_publish(client, &pub_data);
if (err) {
LOG_ERR("could not publish, error %d", err);
break;
}
/* idle and process messages */
while (k_uptime_get() < next_alive) {
LOG_INF("... idling ...");
if (wait_for_input(5 * MSEC_PER_SEC) > 0) {
mqtt_input(client);
}
}
mqtt_live(client);
next_alive += ALIVE_TIME;
}
}