| /* |
| * Copyright (c) 2018 Intel Corporation |
| * |
| * SPDX-License-Identifier: Apache-2.0 |
| */ |
| #include <kernel.h> |
| #include <errno.h> |
| #include <string.h> |
| #include <atomic.h> |
| #include <posix/time.h> |
| #include <posix/mqueue.h> |
| |
| typedef struct mqueue_object { |
| sys_snode_t snode; |
| char *mem_buffer; |
| char *mem_obj; |
| struct k_msgq queue; |
| atomic_t ref_count; |
| char *name; |
| } mqueue_object; |
| |
| typedef struct mqueue_desc { |
| char *mem_desc; |
| mqueue_object *mqueue; |
| u32_t flags; |
| } mqueue_desc; |
| |
| K_SEM_DEFINE(mq_sem, 1, 1); |
| |
| /* Initialize the list */ |
| sys_slist_t mq_list = SYS_SLIST_STATIC_INIT(&mq_list); |
| |
| s64_t timespec_to_timeoutms(const struct timespec *abstime); |
| static mqueue_object *find_in_list(const char *name); |
| static s32_t send_message(mqueue_desc *mqd, const char *msg_ptr, size_t msg_len, |
| s32_t timeout); |
| static int receive_message(mqueue_desc *mqd, char *msg_ptr, size_t msg_len, |
| s32_t timeout); |
| static void remove_mq(mqueue_object *msg_queue); |
| |
| /** |
| * @brief Open a message queue. |
| * |
| * Number of message queue and descriptor to message queue are limited by |
| * heap size. increase the size through CONFIG_HEAP_MEM_POOL_SIZE. |
| * |
| * See IEEE 1003.1 |
| */ |
| mqd_t mq_open(const char *name, int oflags, ...) |
| { |
| va_list va; |
| mode_t mode; |
| mq_attr *attrs = NULL; |
| long msg_size = 0U, max_msgs = 0U; |
| mqueue_object *msg_queue; |
| mqueue_desc *msg_queue_desc = NULL, *mqd = (mqueue_desc *)(-1); |
| char *mq_desc_ptr, *mq_obj_ptr, *mq_buf_ptr, *mq_name_ptr; |
| |
| va_start(va, oflags); |
| if ((oflags & O_CREAT) != 0) { |
| mode = va_arg(va, mode_t); |
| attrs = va_arg(va, mq_attr*); |
| } |
| va_end(va); |
| |
| if (attrs != NULL) { |
| msg_size = attrs->mq_msgsize; |
| max_msgs = attrs->mq_maxmsg; |
| } |
| |
| if ((name == NULL) || ((oflags & O_CREAT) != 0 && (msg_size <= 0 || |
| max_msgs <= 0))) { |
| errno = EINVAL; |
| return (mqd_t)mqd; |
| } |
| |
| if ((strlen(name) + 1) > CONFIG_MQUEUE_NAMELEN_MAX) { |
| errno = ENAMETOOLONG; |
| return (mqd_t)mqd; |
| } |
| |
| /* Check if queue already exists */ |
| k_sem_take(&mq_sem, K_FOREVER); |
| msg_queue = find_in_list(name); |
| k_sem_give(&mq_sem); |
| |
| if ((msg_queue != NULL) && (oflags & O_CREAT) != 0 && |
| (oflags & O_EXCL) != 0) { |
| /* Message queue has alreadey been opened and O_EXCL is set */ |
| errno = EEXIST; |
| return (mqd_t)mqd; |
| } |
| |
| if ((msg_queue == NULL) && (oflags & O_CREAT) == 0) { |
| errno = ENOENT; |
| return (mqd_t)mqd; |
| } |
| |
| mq_desc_ptr = k_malloc(sizeof(struct mqueue_desc)); |
| if (mq_desc_ptr != NULL) { |
| (void)memset(mq_desc_ptr, 0, sizeof(struct mqueue_desc)); |
| msg_queue_desc = (struct mqueue_desc *)mq_desc_ptr; |
| msg_queue_desc->mem_desc = mq_desc_ptr; |
| } else { |
| goto free_mq_desc; |
| } |
| |
| |
| /* Allocate mqueue object for new message queue */ |
| if (msg_queue == NULL) { |
| |
| /* Check for message quantity and size in message queue */ |
| if (attrs->mq_msgsize > CONFIG_MSG_SIZE_MAX && |
| attrs->mq_maxmsg > CONFIG_MSG_COUNT_MAX) { |
| goto free_mq_desc; |
| } |
| |
| mq_obj_ptr = k_malloc(sizeof(mqueue_object)); |
| if (mq_obj_ptr != NULL) { |
| (void)memset(mq_obj_ptr, 0, sizeof(mqueue_object)); |
| msg_queue = (mqueue_object *)mq_obj_ptr; |
| msg_queue->mem_obj = mq_obj_ptr; |
| |
| } else { |
| goto free_mq_object; |
| } |
| |
| mq_name_ptr = k_malloc(strlen(name) + 1); |
| if (mq_name_ptr != NULL) { |
| (void)memset(mq_name_ptr, 0, strlen(name) + 1); |
| msg_queue->name = mq_name_ptr; |
| |
| } else { |
| goto free_mq_name; |
| } |
| |
| strcpy(msg_queue->name, name); |
| |
| mq_buf_ptr = k_malloc(msg_size * max_msgs * sizeof(u8_t)); |
| if (mq_buf_ptr != NULL) { |
| (void)memset(mq_buf_ptr, 0, |
| msg_size * max_msgs * sizeof(u8_t)); |
| msg_queue->mem_buffer = mq_buf_ptr; |
| } else { |
| goto free_mq_buffer; |
| } |
| |
| (void)atomic_set(&msg_queue->ref_count, 1); |
| /* initialize zephyr message queue */ |
| k_msgq_init(&msg_queue->queue, msg_queue->mem_buffer, msg_size, |
| max_msgs); |
| k_sem_take(&mq_sem, K_FOREVER); |
| sys_slist_append(&mq_list, (sys_snode_t *)&(msg_queue->snode)); |
| k_sem_give(&mq_sem); |
| |
| } else { |
| atomic_inc(&msg_queue->ref_count); |
| } |
| |
| msg_queue_desc->mqueue = msg_queue; |
| msg_queue_desc->flags = (oflags & O_NONBLOCK) != 0 ? O_NONBLOCK : 0; |
| return (mqd_t)msg_queue_desc; |
| |
| free_mq_buffer: |
| k_free(mq_name_ptr); |
| free_mq_name: |
| k_free(mq_obj_ptr); |
| free_mq_object: |
| k_free(mq_desc_ptr); |
| free_mq_desc: |
| errno = ENOSPC; |
| return (mqd_t)mqd; |
| } |
| |
| /** |
| * @brief Close a message queue descriptor. |
| * |
| * See IEEE 1003.1 |
| */ |
| int mq_close(mqd_t mqdes) |
| { |
| mqueue_desc *mqd = (mqueue_desc *)mqdes; |
| |
| if (mqd == NULL) { |
| errno = EBADF; |
| return -1; |
| } |
| |
| atomic_dec(&mqd->mqueue->ref_count); |
| |
| /* remove mq if marked for unlink */ |
| if (mqd->mqueue->name == NULL) { |
| remove_mq(mqd->mqueue); |
| } |
| |
| k_free(mqd->mem_desc); |
| return 0; |
| } |
| |
| /** |
| * @brief Remove a message queue. |
| * |
| * See IEEE 1003.1 |
| */ |
| int mq_unlink(const char *name) |
| { |
| mqueue_object *msg_queue; |
| |
| k_sem_take(&mq_sem, K_FOREVER); |
| msg_queue = find_in_list(name); |
| |
| if (msg_queue == NULL) { |
| k_sem_give(&mq_sem); |
| errno = EBADF; |
| return -1; |
| } |
| |
| k_free(msg_queue->name); |
| msg_queue->name = NULL; |
| k_sem_give(&mq_sem); |
| remove_mq(msg_queue); |
| return 0; |
| } |
| |
| /** |
| * @brief Send a message to a message queue. |
| * |
| * All messages in message queue are of equal priority. |
| * |
| * See IEEE 1003.1 |
| */ |
| int mq_send(mqd_t mqdes, const char *msg_ptr, size_t msg_len, |
| unsigned int msg_prio) |
| { |
| mqueue_desc *mqd = (mqueue_desc *)mqdes; |
| s32_t timeout = K_FOREVER; |
| |
| return send_message(mqd, msg_ptr, msg_len, timeout); |
| } |
| |
| /** |
| * @brief Send message to a message queue within abstime time. |
| * |
| * All messages in message queue are of equal priority. |
| * |
| * See IEEE 1003.1 |
| */ |
| int mq_timedsend(mqd_t mqdes, const char *msg_ptr, size_t msg_len, |
| unsigned int msg_prio, const struct timespec *abstime) |
| { |
| mqueue_desc *mqd = (mqueue_desc *)mqdes; |
| s32_t timeout; |
| |
| timeout = (s32_t) timespec_to_timeoutms(abstime); |
| return send_message(mqd, msg_ptr, msg_len, timeout); |
| } |
| |
| /** |
| * @brief Receive a message from a message queue. |
| * |
| * All messages in message queue are of equal priority. |
| * |
| * See IEEE 1003.1 |
| */ |
| int mq_receive(mqd_t mqdes, char *msg_ptr, size_t msg_len, |
| unsigned int *msg_prio) |
| { |
| mqueue_desc *mqd = (mqueue_desc *)mqdes; |
| s32_t timeout = K_FOREVER; |
| |
| return receive_message(mqd, msg_ptr, msg_len, timeout); |
| |
| } |
| |
| /** |
| * @brief Receive message from a message queue within abstime time. |
| * |
| * All messages in message queue are of equal priority. |
| * |
| * See IEEE 1003.1 |
| */ |
| int mq_timedreceive(mqd_t mqdes, char *msg_ptr, size_t msg_len, |
| unsigned int *msg_prio, const struct timespec *abstime) |
| { |
| mqueue_desc *mqd = (mqueue_desc *)mqdes; |
| s32_t timeout = K_NO_WAIT; |
| |
| timeout = (s32_t) timespec_to_timeoutms(abstime); |
| return receive_message(mqd, msg_ptr, msg_len, timeout); |
| } |
| |
| /** |
| * @brief Get message queue attributes. |
| * |
| * See IEEE 1003.1 |
| */ |
| int mq_getattr(mqd_t mqdes, struct mq_attr *mqstat) |
| { |
| mqueue_desc *mqd = (mqueue_desc *)mqdes; |
| struct k_msgq_attrs attrs; |
| |
| if (mqd == NULL) { |
| errno = EBADF; |
| return -1; |
| } |
| |
| k_sem_take(&mq_sem, K_FOREVER); |
| k_msgq_get_attrs(&mqd->mqueue->queue, &attrs); |
| mqstat->mq_flags = mqd->flags; |
| mqstat->mq_maxmsg = attrs.max_msgs; |
| mqstat->mq_msgsize = attrs.msg_size; |
| mqstat->mq_curmsgs = attrs.used_msgs; |
| k_sem_give(&mq_sem); |
| return 0; |
| } |
| |
| /** |
| * @brief Set message queue attributes. |
| * |
| * See IEEE 1003.1 |
| */ |
| int mq_setattr(mqd_t mqdes, const struct mq_attr *mqstat, |
| struct mq_attr *omqstat) |
| { |
| mqueue_desc *mqd = (mqueue_desc *)mqdes; |
| |
| if (mqd == NULL) { |
| errno = EBADF; |
| return -1; |
| } |
| |
| if (mqstat->mq_flags != 0 && mqstat->mq_flags != O_NONBLOCK) { |
| errno = EINVAL; |
| return -1; |
| } |
| |
| if (omqstat != NULL) { |
| mq_getattr(mqdes, omqstat); |
| } |
| |
| k_sem_take(&mq_sem, K_FOREVER); |
| mqd->flags = mqstat->mq_flags; |
| k_sem_give(&mq_sem); |
| |
| return 0; |
| } |
| |
| /* Internal functions */ |
| static mqueue_object *find_in_list(const char *name) |
| { |
| sys_snode_t *mq; |
| mqueue_object *msg_queue; |
| |
| mq = mq_list.head; |
| |
| while (mq != NULL) { |
| msg_queue = (mqueue_object *)mq; |
| if (strcmp(msg_queue->name, name) == 0) { |
| return msg_queue; |
| } |
| |
| mq = mq->next; |
| } |
| |
| return NULL; |
| } |
| |
| static s32_t send_message(mqueue_desc *mqd, const char *msg_ptr, size_t msg_len, |
| s32_t timeout) |
| { |
| s32_t ret = -1; |
| |
| if (mqd == NULL) { |
| errno = EBADF; |
| return ret; |
| } |
| |
| if ((mqd->flags & O_NONBLOCK) != 0U) { |
| timeout = K_NO_WAIT; |
| } |
| |
| if (msg_len > mqd->mqueue->queue.msg_size) { |
| errno = EMSGSIZE; |
| return ret; |
| } |
| |
| if (k_msgq_put(&mqd->mqueue->queue, (void *)msg_ptr, timeout) != 0) { |
| errno = (timeout == K_NO_WAIT) ? EAGAIN : ETIMEDOUT; |
| return ret; |
| } |
| |
| return 0; |
| } |
| |
| static s32_t receive_message(mqueue_desc *mqd, char *msg_ptr, size_t msg_len, |
| s32_t timeout) |
| { |
| int ret = -1; |
| |
| if (mqd == NULL) { |
| errno = EBADF; |
| return ret; |
| } |
| |
| if (msg_len < mqd->mqueue->queue.msg_size) { |
| errno = EMSGSIZE; |
| return ret; |
| } |
| |
| if ((mqd->flags & O_NONBLOCK) != 0U) { |
| timeout = K_NO_WAIT; |
| } |
| |
| if (k_msgq_get(&mqd->mqueue->queue, (void *)msg_ptr, timeout) != 0) { |
| errno = (timeout != K_NO_WAIT) ? ETIMEDOUT : EAGAIN; |
| } else { |
| ret = mqd->mqueue->queue.msg_size; |
| } |
| |
| return ret; |
| } |
| |
| static void remove_mq(mqueue_object *msg_queue) |
| { |
| if (atomic_cas(&msg_queue->ref_count, 0, 0)) { |
| k_sem_take(&mq_sem, K_FOREVER); |
| sys_slist_find_and_remove(&mq_list, (sys_snode_t *) msg_queue); |
| k_sem_give(&mq_sem); |
| |
| /* Free mq buffer and pbject */ |
| k_free(msg_queue->mem_buffer); |
| k_free(msg_queue->mem_obj); |
| } |
| } |