blob: 38e2f2ecc2497b1b5203919a3aef7639a2c4bff9 [file] [log] [blame]
/*
* Copyright (c) 2022 grandcentrix GmbH
*
* SPDX-License-Identifier: Apache-2.0
*/
/** @file mqtt_sn.c
*
* @brief MQTT-SN Client API Implementation.
*/
#include "mqtt_sn_msg.h"
#include <zephyr/logging/log.h>
#include <zephyr/net/mqtt_sn.h>
LOG_MODULE_REGISTER(net_mqtt_sn, CONFIG_MQTT_SN_LOG_LEVEL);
#define MQTT_SN_NET_BUFS (CONFIG_MQTT_SN_LIB_MAX_PUBLISH)
NET_BUF_POOL_FIXED_DEFINE(mqtt_sn_messages, MQTT_SN_NET_BUFS, CONFIG_MQTT_SN_LIB_MAX_PAYLOAD_SIZE,
0, NULL);
struct mqtt_sn_confirmable {
int64_t last_attempt;
uint16_t msg_id;
uint8_t retries;
bool in_use;
};
struct mqtt_sn_publish {
struct mqtt_sn_confirmable con;
sys_snode_t next;
struct mqtt_sn_topic *topic;
uint8_t pubdata[CONFIG_MQTT_SN_LIB_MAX_PAYLOAD_SIZE];
size_t datalen;
enum mqtt_sn_qos qos;
bool retain;
};
static struct mqtt_sn_publish pubs[CONFIG_MQTT_SN_LIB_MAX_PUBLISH];
enum mqtt_sn_topic_state {
MQTT_SN_TOPIC_STATE_REGISTERING,
MQTT_SN_TOPIC_STATE_REGISTERED,
MQTT_SN_TOPIC_STATE_SUBSCRIBING,
MQTT_SN_TOPIC_STATE_SUBSCRIBED,
MQTT_SN_TOPIC_STATE_UNSUBSCRIBING,
};
struct mqtt_sn_topic {
struct mqtt_sn_confirmable con;
sys_snode_t next;
char name[CONFIG_MQTT_SN_LIB_MAX_TOPIC_SIZE];
size_t namelen;
uint16_t topic_id;
enum mqtt_sn_qos qos;
enum mqtt_sn_topic_type type;
enum mqtt_sn_topic_state state;
};
static struct mqtt_sn_topic topics[CONFIG_MQTT_SN_LIB_MAX_TOPICS];
enum mqtt_sn_client_state {
MQTT_SN_CLIENT_DISCONNECTED,
MQTT_SN_CLIENT_ACTIVE,
MQTT_SN_CLIENT_ASLEEP,
MQTT_SN_CLIENT_AWAKE
};
static void mqtt_sn_set_state(struct mqtt_sn_client *client, enum mqtt_sn_client_state state)
{
int prev_state = client->state;
client->state = state;
LOG_DBG("Client %p state (%d) -> (%d)", client, prev_state, state);
}
#define T_RETRY_MSEC (CONFIG_MQTT_SN_LIB_T_RETRY * MSEC_PER_SEC)
#define N_RETRY (CONFIG_MQTT_SN_LIB_N_RETRY)
#define T_KEEPALIVE_MSEC (CONFIG_MQTT_SN_KEEPALIVE * MSEC_PER_SEC)
static uint16_t next_msg_id(void)
{
static uint16_t msg_id;
return ++msg_id;
}
static int encode_and_send(struct mqtt_sn_client *client, struct mqtt_sn_param *p)
{
int err;
err = mqtt_sn_encode_msg(&client->tx, p);
if (err) {
goto end;
}
LOG_HEXDUMP_DBG(client->tx.data, client->tx.len, "Send message");
if (!client->transport->msg_send) {
LOG_ERR("Can't send: no callback");
err = -ENOTSUP;
goto end;
}
if (!client->tx.len) {
LOG_WRN("Can't send: empty");
err = -ENOMSG;
goto end;
}
err = client->transport->msg_send(client, client->tx.data, client->tx.len);
if (err) {
LOG_ERR("Error during send: %d", err);
goto end;
}
end:
net_buf_simple_reset(&client->tx);
return err;
}
static void mqtt_sn_con_init(struct mqtt_sn_confirmable *con)
{
con->last_attempt = 0;
con->retries = N_RETRY;
con->msg_id = next_msg_id();
con->in_use = true;
}
static void mqtt_sn_publish_destroy(struct mqtt_sn_client *client, struct mqtt_sn_publish *pub)
{
sys_slist_find_and_remove(&client->publish, &pub->next);
memset(pub, 0, sizeof(*pub));
}
static void mqtt_sn_publish_destroy_all(struct mqtt_sn_client *client)
{
struct mqtt_sn_publish *pub;
sys_snode_t *next;
while ((next = sys_slist_get(&client->publish)) != NULL) {
pub = SYS_SLIST_CONTAINER(next, pub, next);
memset(pub, 0, sizeof(*pub));
}
}
static struct mqtt_sn_publish *mqtt_sn_publish_find_empty(void)
{
size_t i;
for (i = 0; i < ARRAY_SIZE(pubs); i++) {
if (!pubs[i].con.in_use) {
return &pubs[i];
}
}
return NULL;
}
static struct mqtt_sn_publish *mqtt_sn_publish_create(struct mqtt_sn_data *data)
{
struct mqtt_sn_publish *pub = mqtt_sn_publish_find_empty();
if (!pub) {
LOG_ERR("Can't create PUB: no free slot");
return NULL;
}
if (data && data->data && data->size) {
if (data->size > sizeof(pub->pubdata)) {
LOG_ERR("Can't create PUB: Too much data (%" PRIu16 ")", data->size);
return NULL;
}
memcpy(pub->pubdata, data->data, data->size);
pub->datalen = data->size;
}
mqtt_sn_con_init(&pub->con);
return pub;
}
static struct mqtt_sn_publish *mqtt_sn_publish_find_msg_id(struct mqtt_sn_client *client,
uint16_t msg_id)
{
struct mqtt_sn_publish *pub;
SYS_SLIST_FOR_EACH_CONTAINER(&client->publish, pub, next) {
if (pub->con.msg_id == msg_id) {
return pub;
}
}
return NULL;
}
static struct mqtt_sn_publish *mqtt_sn_publish_find_topic(struct mqtt_sn_client *client,
struct mqtt_sn_topic *topic)
{
struct mqtt_sn_publish *pub;
SYS_SLIST_FOR_EACH_CONTAINER(&client->publish, pub, next) {
if (pub->topic == topic) {
return pub;
}
}
return NULL;
}
static struct mqtt_sn_topic *mqtt_sn_topic_find_empty(void)
{
size_t i;
for (i = 0; i < ARRAY_SIZE(topics); i++) {
if (!topics[i].con.in_use) {
return &topics[i];
}
}
return NULL;
}
static struct mqtt_sn_topic *mqtt_sn_topic_create(struct mqtt_sn_data *name)
{
struct mqtt_sn_topic *topic = mqtt_sn_topic_find_empty();
if (!topic) {
LOG_ERR("Can't create topic: no free slot");
return NULL;
}
if (!name || !name->data || !name->size) {
LOG_ERR("Can't create topic with empty name");
return NULL;
}
if (name->size > sizeof(topic->name)) {
LOG_ERR("Can't create topic: name too long (%" PRIu16 ")", name->size);
return NULL;
}
memcpy(topic->name, name->data, name->size);
topic->namelen = name->size;
mqtt_sn_con_init(&topic->con);
return topic;
}
static struct mqtt_sn_topic *mqtt_sn_topic_find_name(struct mqtt_sn_client *client,
struct mqtt_sn_data *topic_name)
{
struct mqtt_sn_topic *topic;
SYS_SLIST_FOR_EACH_CONTAINER(&client->topic, topic, next) {
if (topic->namelen == topic_name->size &&
memcmp(topic->name, topic_name->data, topic_name->size) == 0) {
return topic;
}
}
return NULL;
}
static struct mqtt_sn_topic *mqtt_sn_topic_find_msg_id(struct mqtt_sn_client *client,
uint16_t msg_id)
{
struct mqtt_sn_topic *topic;
SYS_SLIST_FOR_EACH_CONTAINER(&client->topic, topic, next) {
if (topic->con.msg_id == msg_id) {
return topic;
}
}
return NULL;
}
static void mqtt_sn_topic_destroy(struct mqtt_sn_client *client, struct mqtt_sn_topic *topic)
{
struct mqtt_sn_publish *pub;
/* Destroy all pubs referencing this topic */
while ((pub = mqtt_sn_publish_find_topic(client, topic)) != NULL) {
LOG_WRN("Destroying publish msg_id %d", pub->con.msg_id);
mqtt_sn_publish_destroy(client, pub);
}
sys_slist_find_and_remove(&client->topic, &topic->next);
memset(topic, 0, sizeof(*topic));
}
static void mqtt_sn_topic_destroy_all(struct mqtt_sn_client *client)
{
struct mqtt_sn_topic *topic;
struct mqtt_sn_publish *pub;
sys_snode_t *next;
while ((next = sys_slist_get(&client->topic)) != NULL) {
topic = SYS_SLIST_CONTAINER(next, topic, next);
/* Destroy all pubs referencing this topic */
while ((pub = mqtt_sn_publish_find_topic(client, topic)) != NULL) {
LOG_WRN("Destroying publish msg_id %d", pub->con.msg_id);
mqtt_sn_publish_destroy(client, pub);
}
memset(topic, 0, sizeof(*topic));
}
}
static void mqtt_sn_disconnect_internal(struct mqtt_sn_client *client)
{
struct mqtt_sn_evt evt = {.type = MQTT_SN_EVT_DISCONNECTED};
mqtt_sn_set_state(client, MQTT_SN_CLIENT_DISCONNECTED);
if (client->evt_cb) {
client->evt_cb(client, &evt);
}
/*
* Remove all publishes, but keep topics
* Topics are removed on deinit or when connect is called with
* clean-session = true
*/
mqtt_sn_publish_destroy_all(client);
k_work_cancel_delayable(&client->process_work);
}
static void mqtt_sn_sleep_internal(struct mqtt_sn_client *client)
{
struct mqtt_sn_evt evt = {.type = MQTT_SN_EVT_DISCONNECTED};
mqtt_sn_set_state(client, MQTT_SN_CLIENT_ASLEEP);
if (client->evt_cb) {
client->evt_cb(client, &evt);
}
}
static void mqtt_sn_do_subscribe(struct mqtt_sn_client *client, struct mqtt_sn_topic *topic,
bool dup)
{
struct mqtt_sn_param p = {.type = MQTT_SN_MSG_TYPE_SUBSCRIBE};
if (!client || !topic) {
return;
}
if (client->state != MQTT_SN_CLIENT_ACTIVE) {
LOG_ERR("Cannot subscribe: not connected");
return;
}
p.params.subscribe.msg_id = topic->con.msg_id;
p.params.subscribe.qos = topic->qos;
p.params.subscribe.topic_type = topic->type;
p.params.subscribe.dup = dup;
switch (topic->type) {
case MQTT_SN_TOPIC_TYPE_NORMAL:
p.params.subscribe.topic.topic_name.data = topic->name;
p.params.subscribe.topic.topic_name.size = topic->namelen;
break;
case MQTT_SN_TOPIC_TYPE_PREDEF:
case MQTT_SN_TOPIC_TYPE_SHORT:
p.params.subscribe.topic.topic_id = topic->topic_id;
break;
default:
LOG_ERR("Unexpected topic type %d", topic->type);
return;
}
encode_and_send(client, &p);
}
static void mqtt_sn_do_unsubscribe(struct mqtt_sn_client *client, struct mqtt_sn_topic *topic)
{
struct mqtt_sn_param p = {.type = MQTT_SN_MSG_TYPE_UNSUBSCRIBE};
if (!client || !topic) {
return;
}
if (client->state != MQTT_SN_CLIENT_ACTIVE) {
LOG_ERR("Cannot unsubscribe: not connected");
return;
}
p.params.unsubscribe.msg_id = topic->con.msg_id;
p.params.unsubscribe.topic_type = topic->type;
switch (topic->type) {
case MQTT_SN_TOPIC_TYPE_NORMAL:
p.params.unsubscribe.topic.topic_name.data = topic->name;
p.params.unsubscribe.topic.topic_name.size = topic->namelen;
break;
case MQTT_SN_TOPIC_TYPE_PREDEF:
case MQTT_SN_TOPIC_TYPE_SHORT:
p.params.unsubscribe.topic.topic_id = topic->topic_id;
break;
default:
LOG_ERR("Unexpected topic type %d", topic->type);
return;
}
encode_and_send(client, &p);
}
static void mqtt_sn_do_register(struct mqtt_sn_client *client, struct mqtt_sn_topic *topic)
{
struct mqtt_sn_param p = {.type = MQTT_SN_MSG_TYPE_REGISTER};
if (!client || !topic) {
return;
}
if (client->state != MQTT_SN_CLIENT_ACTIVE) {
LOG_ERR("Cannot register: not connected");
return;
}
p.params.reg.msg_id = topic->con.msg_id;
switch (topic->type) {
case MQTT_SN_TOPIC_TYPE_NORMAL:
LOG_HEXDUMP_INF(topic->name, topic->namelen, "Registering topic");
p.params.reg.topic.data = topic->name;
p.params.reg.topic.size = topic->namelen;
break;
default:
LOG_ERR("Unexpected topic type %d", topic->type);
return;
}
encode_and_send(client, &p);
}
static void mqtt_sn_do_publish(struct mqtt_sn_client *client, struct mqtt_sn_publish *pub, bool dup)
{
struct mqtt_sn_param p = {.type = MQTT_SN_MSG_TYPE_PUBLISH};
if (!client || !pub) {
return;
}
if (client->state != MQTT_SN_CLIENT_ACTIVE) {
LOG_ERR("Cannot subscribe: not connected");
return;
}
LOG_INF("Publishing to topic ID %d", pub->topic->topic_id);
p.params.publish.data.data = pub->pubdata;
p.params.publish.data.size = pub->datalen;
p.params.publish.msg_id = pub->con.msg_id;
p.params.publish.retain = pub->retain;
p.params.publish.topic_id = pub->topic->topic_id;
p.params.publish.topic_type = pub->topic->type;
p.params.publish.qos = pub->qos;
p.params.publish.dup = dup;
encode_and_send(client, &p);
}
static void mqtt_sn_do_ping(struct mqtt_sn_client *client)
{
struct mqtt_sn_param p = {.type = MQTT_SN_MSG_TYPE_PINGREQ};
switch (client->state) {
case MQTT_SN_CLIENT_ASLEEP:
/*
* From the spec regarding PINGREQ:
* ClientId: contains the client id; this field is optional
* and is included by a “sleeping” client when it goes to the
* “awake” state and is waiting for messages sent by the
* server/gateway, see Section 6.14 for further details.
*/
p.params.pingreq.client_id.data = client->client_id.data;
p.params.pingreq.client_id.size = client->client_id.size;
case MQTT_SN_CLIENT_ACTIVE:
encode_and_send(client, &p);
break;
default:
LOG_WRN("Can't ping in state %d", client->state);
break;
}
}
static int process_pubs(struct mqtt_sn_client *client, int64_t *next_cycle)
{
struct mqtt_sn_publish *pub, *pubs;
const int64_t now = k_uptime_get();
int64_t next_attempt;
bool dup;
*next_cycle = 0;
SYS_SLIST_FOR_EACH_CONTAINER_SAFE(&client->publish, pub, pubs, next) {
LOG_HEXDUMP_DBG(pub->topic->name, pub->topic->namelen,
"Processing publish for topic");
LOG_HEXDUMP_DBG(pub->pubdata, pub->datalen, "Processing publish data");
if (pub->con.last_attempt == 0) {
next_attempt = 0;
dup = false;
} else {
next_attempt = pub->con.last_attempt + T_RETRY_MSEC;
dup = true;
}
if (next_attempt <= now) {
switch (pub->topic->state) {
case MQTT_SN_TOPIC_STATE_REGISTERING:
case MQTT_SN_TOPIC_STATE_SUBSCRIBING:
case MQTT_SN_TOPIC_STATE_UNSUBSCRIBING:
LOG_INF("Can't publish; topic is not ready");
break;
case MQTT_SN_TOPIC_STATE_REGISTERED:
case MQTT_SN_TOPIC_STATE_SUBSCRIBED:
if (!pub->con.retries--) {
LOG_WRN("PUB ran out of retries, disconnecting");
mqtt_sn_disconnect_internal(client);
return -ETIMEDOUT;
}
mqtt_sn_do_publish(client, pub, dup);
if (pub->qos == MQTT_SN_QOS_0 || pub->qos == MQTT_SN_QOS_M1) {
/* We are done, remove this */
mqtt_sn_publish_destroy(client, pub);
continue;
} else {
/* Wait for ack */
pub->con.last_attempt = now;
next_attempt = now + T_RETRY_MSEC;
}
break;
}
}
if (next_attempt > now && (*next_cycle == 0 || next_attempt < *next_cycle)) {
*next_cycle = next_attempt;
}
}
return 0;
}
static int process_topics(struct mqtt_sn_client *client, int64_t *next_cycle)
{
struct mqtt_sn_topic *topic;
const int64_t now = k_uptime_get();
int64_t next_attempt;
bool dup;
SYS_SLIST_FOR_EACH_CONTAINER(&client->topic, topic, next) {
LOG_HEXDUMP_DBG(topic->name, topic->namelen, "Processing topic");
if (topic->con.last_attempt == 0) {
next_attempt = 0;
dup = false;
} else {
next_attempt = topic->con.last_attempt + T_RETRY_MSEC;
dup = true;
}
if (next_attempt <= now) {
switch (topic->state) {
case MQTT_SN_TOPIC_STATE_SUBSCRIBING:
if (!topic->con.retries--) {
LOG_WRN("Topic ran out of retries, disconnecting");
mqtt_sn_disconnect_internal(client);
return -ETIMEDOUT;
}
mqtt_sn_do_subscribe(client, topic, dup);
topic->con.last_attempt = now;
next_attempt = now + T_RETRY_MSEC;
break;
case MQTT_SN_TOPIC_STATE_REGISTERING:
if (!topic->con.retries--) {
LOG_WRN("Topic ran out of retries, disconnecting");
mqtt_sn_disconnect_internal(client);
return -ETIMEDOUT;
}
mqtt_sn_do_register(client, topic);
topic->con.last_attempt = now;
next_attempt = now + T_RETRY_MSEC;
break;
case MQTT_SN_TOPIC_STATE_UNSUBSCRIBING:
if (!topic->con.retries--) {
LOG_WRN("Topic ran out of retries, disconnecting");
mqtt_sn_disconnect_internal(client);
return -ETIMEDOUT;
}
mqtt_sn_do_unsubscribe(client, topic);
topic->con.last_attempt = now;
next_attempt = now + T_RETRY_MSEC;
break;
case MQTT_SN_TOPIC_STATE_REGISTERED:
case MQTT_SN_TOPIC_STATE_SUBSCRIBED:
break;
}
}
if (next_attempt > now && (*next_cycle == 0 || next_attempt < *next_cycle)) {
*next_cycle = next_attempt;
}
}
return 0;
}
static int process_ping(struct mqtt_sn_client *client, int64_t *next_cycle)
{
const int64_t now = k_uptime_get();
int64_t next_ping;
if (client->ping_retries == N_RETRY) {
/* Last ping was acked */
next_ping = client->last_ping + T_KEEPALIVE_MSEC;
} else {
next_ping = client->last_ping + T_RETRY_MSEC;
}
if (next_ping < now) {
if (!client->ping_retries--) {
LOG_WRN("Ping ran out of retries");
mqtt_sn_disconnect_internal(client);
return -ETIMEDOUT;
}
LOG_DBG("Sending PINGREQ");
mqtt_sn_do_ping(client);
client->last_ping = now;
next_ping = now + T_RETRY_MSEC;
}
if (*next_cycle == 0 || next_ping < *next_cycle) {
*next_cycle = next_ping;
}
return 0;
}
static void process_work(struct k_work *wrk)
{
struct mqtt_sn_client *client;
struct k_work_delayable *dwork;
int64_t next_cycle = 0;
int err;
dwork = k_work_delayable_from_work(wrk);
client = CONTAINER_OF(dwork, struct mqtt_sn_client, process_work);
LOG_DBG("Executing work of client %p in state %d", client, client->state);
if (client->state == MQTT_SN_CLIENT_DISCONNECTED) {
LOG_WRN("%s called while disconnected: Nothing to do", __func__);
return;
}
if (client->state == MQTT_SN_CLIENT_ACTIVE) {
err = process_topics(client, &next_cycle);
if (err) {
return;
}
err = process_pubs(client, &next_cycle);
if (err) {
return;
}
err = process_ping(client, &next_cycle);
if (err) {
return;
}
}
if (next_cycle > 0) {
k_work_schedule(dwork, K_MSEC(next_cycle - k_uptime_get()));
}
}
int mqtt_sn_client_init(struct mqtt_sn_client *client, const struct mqtt_sn_data *client_id,
struct mqtt_sn_transport *transport, mqtt_sn_evt_cb_t evt_cb, void *tx,
size_t txsz, void *rx, size_t rxsz)
{
if (!client || !client_id || !transport || !evt_cb || !tx || !rx) {
return -EINVAL;
}
memset(client, 0, sizeof(*client));
client->client_id.data = client_id->data;
client->client_id.size = client_id->size;
client->transport = transport;
client->evt_cb = evt_cb;
net_buf_simple_init_with_data(&client->tx, tx, txsz);
net_buf_simple_reset(&client->tx);
net_buf_simple_init_with_data(&client->rx, rx, rxsz);
net_buf_simple_reset(&client->rx);
k_work_init_delayable(&client->process_work, process_work);
if (transport->init) {
transport->init(transport);
}
return 0;
}
void mqtt_sn_client_deinit(struct mqtt_sn_client *client)
{
if (!client) {
return;
}
mqtt_sn_publish_destroy_all(client);
mqtt_sn_topic_destroy_all(client);
if (client->transport && client->transport->deinit) {
client->transport->deinit(client->transport);
}
k_work_cancel_delayable(&client->process_work);
}
int mqtt_sn_connect(struct mqtt_sn_client *client, bool will, bool clean_session)
{
struct mqtt_sn_param p = {.type = MQTT_SN_MSG_TYPE_CONNECT};
if (!client) {
return -EINVAL;
}
if (will && (!client->will_msg.data || !client->will_topic.data)) {
LOG_ERR("will set to true, but no will data in client");
return -EINVAL;
}
if (clean_session) {
mqtt_sn_topic_destroy_all(client);
}
p.params.connect.clean_session = clean_session;
p.params.connect.will = will;
p.params.connect.duration = CONFIG_MQTT_SN_KEEPALIVE;
p.params.connect.client_id.data = client->client_id.data;
p.params.connect.client_id.size = client->client_id.size;
client->last_ping = k_uptime_get();
return encode_and_send(client, &p);
}
int mqtt_sn_disconnect(struct mqtt_sn_client *client)
{
struct mqtt_sn_param p = {.type = MQTT_SN_MSG_TYPE_DISCONNECT};
int err;
if (!client) {
return -EINVAL;
}
p.params.disconnect.duration = 0;
err = encode_and_send(client, &p);
mqtt_sn_disconnect_internal(client);
return err;
}
int mqtt_sn_sleep(struct mqtt_sn_client *client, uint16_t duration)
{
struct mqtt_sn_param p = {.type = MQTT_SN_MSG_TYPE_DISCONNECT};
int err;
if (!client || !duration) {
return -EINVAL;
}
p.params.disconnect.duration = duration;
err = encode_and_send(client, &p);
mqtt_sn_sleep_internal(client);
return err;
}
int mqtt_sn_subscribe(struct mqtt_sn_client *client, enum mqtt_sn_qos qos,
struct mqtt_sn_data *topic_name)
{
struct mqtt_sn_topic *topic;
int err;
if (!client || !topic_name || !topic_name->data || !topic_name->size) {
return -EINVAL;
}
if (client->state != MQTT_SN_CLIENT_ACTIVE) {
LOG_ERR("Cannot subscribe: not connected");
return -ENOTCONN;
}
topic = mqtt_sn_topic_find_name(client, topic_name);
if (!topic) {
topic = mqtt_sn_topic_create(topic_name);
if (!topic) {
return -ENOMEM;
}
topic->qos = qos;
topic->state = MQTT_SN_TOPIC_STATE_SUBSCRIBING;
sys_slist_append(&client->topic, &topic->next);
}
err = k_work_reschedule(&client->process_work, K_NO_WAIT);
if (err < 0) {
return err;
}
return 0;
}
int mqtt_sn_unsubscribe(struct mqtt_sn_client *client, enum mqtt_sn_qos qos,
struct mqtt_sn_data *topic_name)
{
struct mqtt_sn_topic *topic;
int err;
if (!client || !topic_name) {
return -EINVAL;
}
if (client->state != MQTT_SN_CLIENT_ACTIVE) {
LOG_ERR("Cannot unsubscribe: not connected");
return -ENOTCONN;
}
topic = mqtt_sn_topic_find_name(client, topic_name);
if (!topic) {
LOG_HEXDUMP_ERR(topic_name->data, topic_name->size, "Topic not found");
return -ENOENT;
}
topic->state = MQTT_SN_TOPIC_STATE_UNSUBSCRIBING;
mqtt_sn_con_init(&topic->con);
err = k_work_reschedule(&client->process_work, K_NO_WAIT);
if (err < 0) {
return err;
}
return 0;
}
int mqtt_sn_publish(struct mqtt_sn_client *client, enum mqtt_sn_qos qos,
struct mqtt_sn_data *topic_name, bool retain, struct mqtt_sn_data *data)
{
struct mqtt_sn_publish *pub;
struct mqtt_sn_topic *topic;
int err;
if (!client || !topic_name) {
return -EINVAL;
}
if (qos == MQTT_SN_QOS_M1) {
LOG_ERR("QoS -1 not supported");
return -ENOTSUP;
}
if (client->state != MQTT_SN_CLIENT_ACTIVE) {
LOG_ERR("Cannot publish: disconnected");
return -ENOTCONN;
}
topic = mqtt_sn_topic_find_name(client, topic_name);
if (!topic) {
topic = mqtt_sn_topic_create(topic_name);
if (!topic) {
return -ENOMEM;
}
topic->qos = qos;
topic->state = MQTT_SN_TOPIC_STATE_REGISTERING;
sys_slist_append(&client->topic, &topic->next);
}
pub = mqtt_sn_publish_create(data);
if (!pub) {
return -ENOMEM;
}
pub->qos = qos;
pub->retain = retain;
pub->topic = topic;
sys_slist_append(&client->publish, &pub->next);
err = k_work_reschedule(&client->process_work, K_NO_WAIT);
if (err < 0) {
return err;
}
return 0;
}
static void handle_connack(struct mqtt_sn_client *client, struct mqtt_sn_param_connack *p)
{
struct mqtt_sn_evt evt = {.type = MQTT_SN_EVT_CONNECTED};
if (p->ret_code == MQTT_SN_CODE_ACCEPTED) {
LOG_INF("MQTT_SN client connected");
switch (client->state) {
case MQTT_SN_CLIENT_DISCONNECTED:
case MQTT_SN_CLIENT_ASLEEP:
case MQTT_SN_CLIENT_AWAKE:
mqtt_sn_set_state(client, MQTT_SN_CLIENT_ACTIVE);
if (client->evt_cb) {
client->evt_cb(client, &evt);
}
client->ping_retries = N_RETRY;
break;
default:
LOG_ERR("Client received CONNACK but was in state %d", client->state);
return;
}
} else {
LOG_WRN("CONNACK ret code %d", p->ret_code);
mqtt_sn_disconnect_internal(client);
}
k_work_schedule(&client->process_work, K_NO_WAIT);
}
static void handle_willtopicreq(struct mqtt_sn_client *client)
{
struct mqtt_sn_param response = {.type = MQTT_SN_MSG_TYPE_WILLTOPIC};
response.params.willtopic.qos = client->will_qos;
response.params.willtopic.retain = client->will_retain;
response.params.willtopic.topic.data = client->will_topic.data;
response.params.willtopic.topic.size = client->will_topic.size;
encode_and_send(client, &response);
}
static void handle_willmsgreq(struct mqtt_sn_client *client)
{
struct mqtt_sn_param response = {.type = MQTT_SN_MSG_TYPE_WILLMSG};
response.params.willmsg.msg.data = client->will_msg.data;
response.params.willmsg.msg.size = client->will_msg.size;
encode_and_send(client, &response);
}
static void handle_register(struct mqtt_sn_client *client, struct mqtt_sn_param_register *p)
{
struct mqtt_sn_param response = {.type = MQTT_SN_MSG_TYPE_REGACK};
struct mqtt_sn_topic *topic;
topic = mqtt_sn_topic_create(&p->topic);
if (!topic) {
return;
}
topic->state = MQTT_SN_TOPIC_STATE_REGISTERED;
topic->topic_id = p->topic_id;
topic->type = MQTT_SN_TOPIC_TYPE_NORMAL;
response.params.regack.ret_code = MQTT_SN_CODE_ACCEPTED;
response.params.regack.topic_id = p->topic_id;
response.params.regack.msg_id = p->msg_id;
encode_and_send(client, &response);
}
static void handle_regack(struct mqtt_sn_client *client, struct mqtt_sn_param_regack *p)
{
struct mqtt_sn_topic *topic = mqtt_sn_topic_find_msg_id(client, p->msg_id);
if (!topic) {
LOG_ERR("Can't REGACK, no topic found");
return;
}
if (p->ret_code == MQTT_SN_CODE_ACCEPTED) {
topic->state = MQTT_SN_TOPIC_STATE_REGISTERED;
topic->topic_id = p->topic_id;
} else {
LOG_WRN("Gateway could not register topic ID %u, code %d", p->topic_id,
p->ret_code);
}
}
static void handle_publish(struct mqtt_sn_client *client, struct mqtt_sn_param_publish *p)
{
struct mqtt_sn_param response;
struct mqtt_sn_evt evt = {.param.publish = {.data = p->data,
.topic_id = p->topic_id,
.topic_type = p->topic_type},
.type = MQTT_SN_EVT_PUBLISH};
if (p->qos == MQTT_SN_QOS_1) {
response.type = MQTT_SN_MSG_TYPE_PUBACK;
response.params.puback.topic_id = p->topic_id;
response.params.puback.msg_id = p->msg_id;
response.params.puback.ret_code = MQTT_SN_CODE_ACCEPTED;
encode_and_send(client, &response);
} else if (p->qos == MQTT_SN_QOS_2) {
response.type = MQTT_SN_MSG_TYPE_PUBREC;
response.params.pubrec.msg_id = p->msg_id;
encode_and_send(client, &response);
}
if (client->evt_cb) {
client->evt_cb(client, &evt);
}
}
static void handle_puback(struct mqtt_sn_client *client, struct mqtt_sn_param_puback *p)
{
struct mqtt_sn_publish *pub = mqtt_sn_publish_find_msg_id(client, p->msg_id);
if (!pub) {
LOG_ERR("No matching PUBLISH found for msg id %u", p->msg_id);
return;
}
mqtt_sn_publish_destroy(client, pub);
}
static void handle_pubrec(struct mqtt_sn_client *client, struct mqtt_sn_param_pubrec *p)
{
struct mqtt_sn_param response = {.type = MQTT_SN_MSG_TYPE_PUBREL};
struct mqtt_sn_publish *pub = mqtt_sn_publish_find_msg_id(client, p->msg_id);
if (!pub) {
LOG_ERR("No matching PUBLISH found for msg id %u", p->msg_id);
return;
}
pub->con.last_attempt = k_uptime_get();
pub->con.retries = N_RETRY;
response.params.pubrel.msg_id = p->msg_id;
encode_and_send(client, &response);
}
static void handle_pubrel(struct mqtt_sn_client *client, struct mqtt_sn_param_pubrel *p)
{
struct mqtt_sn_param response = {.type = MQTT_SN_MSG_TYPE_PUBCOMP};
response.params.pubcomp.msg_id = p->msg_id;
encode_and_send(client, &response);
}
static void handle_pubcomp(struct mqtt_sn_client *client, struct mqtt_sn_param_pubcomp *p)
{
struct mqtt_sn_publish *pub = mqtt_sn_publish_find_msg_id(client, p->msg_id);
if (!pub) {
LOG_ERR("No matching PUBLISH found for msg id %u", p->msg_id);
return;
}
mqtt_sn_publish_destroy(client, pub);
}
static void handle_suback(struct mqtt_sn_client *client, struct mqtt_sn_param_suback *p)
{
struct mqtt_sn_topic *topic = mqtt_sn_topic_find_msg_id(client, p->msg_id);
if (!topic) {
LOG_ERR("No matching SUBSCRIBE found for msg id %u", p->msg_id);
return;
}
if (p->ret_code == MQTT_SN_CODE_ACCEPTED) {
topic->state = MQTT_SN_TOPIC_STATE_SUBSCRIBED;
topic->topic_id = p->topic_id;
topic->qos = p->qos;
} else {
LOG_WRN("SUBACK with ret code %d", p->ret_code);
}
}
static void handle_unsuback(struct mqtt_sn_client *client, struct mqtt_sn_param_unsuback *p)
{
struct mqtt_sn_topic *topic = mqtt_sn_topic_find_msg_id(client, p->msg_id);
if (!topic || topic->state != MQTT_SN_TOPIC_STATE_UNSUBSCRIBING) {
LOG_ERR("No matching UNSUBSCRIBE found for msg id %u", p->msg_id);
return;
}
mqtt_sn_topic_destroy(client, topic);
}
static void handle_pingreq(struct mqtt_sn_client *client)
{
struct mqtt_sn_param response = {.type = MQTT_SN_MSG_TYPE_PINGRESP};
encode_and_send(client, &response);
}
static void handle_pingresp(struct mqtt_sn_client *client)
{
struct mqtt_sn_evt evt = {.type = MQTT_SN_EVT_PINGRESP};
if (client->evt_cb) {
client->evt_cb(client, &evt);
}
if (client->state == MQTT_SN_CLIENT_AWAKE) {
mqtt_sn_set_state(client, MQTT_SN_CLIENT_ASLEEP);
}
client->ping_retries = N_RETRY;
}
static void handle_disconnect(struct mqtt_sn_client *client, struct mqtt_sn_param_disconnect *p)
{
LOG_INF("Received DISCONNECT");
mqtt_sn_disconnect_internal(client);
}
static int handle_msg(struct mqtt_sn_client *client)
{
int err;
struct mqtt_sn_param p;
err = mqtt_sn_decode_msg(&client->rx, &p);
if (err) {
return err;
}
LOG_INF("Got message of type %d", p.type);
switch (p.type) {
case MQTT_SN_MSG_TYPE_GWINFO:
break;
case MQTT_SN_MSG_TYPE_CONNACK:
handle_connack(client, &p.params.connack);
break;
case MQTT_SN_MSG_TYPE_WILLTOPICREQ:
handle_willtopicreq(client);
break;
case MQTT_SN_MSG_TYPE_WILLMSGREQ:
handle_willmsgreq(client);
break;
case MQTT_SN_MSG_TYPE_REGISTER:
handle_register(client, &p.params.reg);
break;
case MQTT_SN_MSG_TYPE_REGACK:
handle_regack(client, &p.params.regack);
break;
case MQTT_SN_MSG_TYPE_PUBLISH:
handle_publish(client, &p.params.publish);
break;
case MQTT_SN_MSG_TYPE_PUBACK:
handle_puback(client, &p.params.puback);
break;
case MQTT_SN_MSG_TYPE_PUBREC:
handle_pubrec(client, &p.params.pubrec);
break;
case MQTT_SN_MSG_TYPE_PUBREL:
handle_pubrel(client, &p.params.pubrel);
break;
case MQTT_SN_MSG_TYPE_PUBCOMP:
handle_pubcomp(client, &p.params.pubcomp);
break;
case MQTT_SN_MSG_TYPE_SUBACK:
handle_suback(client, &p.params.suback);
break;
case MQTT_SN_MSG_TYPE_UNSUBACK:
handle_unsuback(client, &p.params.unsuback);
break;
case MQTT_SN_MSG_TYPE_PINGREQ:
handle_pingreq(client);
break;
case MQTT_SN_MSG_TYPE_PINGRESP:
handle_pingresp(client);
break;
case MQTT_SN_MSG_TYPE_DISCONNECT:
handle_disconnect(client, &p.params.disconnect);
break;
case MQTT_SN_MSG_TYPE_WILLTOPICRESP:
break;
case MQTT_SN_MSG_TYPE_WILLMSGRESP:
break;
default:
LOG_ERR("Unexpected message type %d", p.type);
break;
}
k_work_reschedule(&client->process_work, K_NO_WAIT);
return 0;
}
int mqtt_sn_input(struct mqtt_sn_client *client)
{
ssize_t next_frame_size;
int err;
if (!client || !client->transport || !client->transport->recv) {
return -EINVAL;
}
if (client->transport->poll) {
next_frame_size = client->transport->poll(client);
if (next_frame_size <= 0) {
return next_frame_size;
}
}
net_buf_simple_reset(&client->rx);
next_frame_size = client->transport->recv(client, client->rx.data, client->rx.size);
if (next_frame_size <= 0) {
return next_frame_size;
}
if (next_frame_size > client->rx.size) {
return -ENOBUFS;
}
client->rx.len = next_frame_size;
LOG_HEXDUMP_DBG(client->rx.data, client->rx.len, "Received data");
err = handle_msg(client);
if (err) {
return err;
}
/* Should be zero */
return -client->rx.len;
}