| /* |
| * Copyright (c) 2022 Rodrigo Peixoto <rodrigopex@gmail.com> |
| * SPDX-License-Identifier: Apache-2.0 |
| */ |
| |
| #include <zephyr/kernel.h> |
| #include <zephyr/init.h> |
| #include <zephyr/sys/iterable_sections.h> |
| #include <zephyr/logging/log.h> |
| #include <zephyr/sys/printk.h> |
| #include <zephyr/net/buf.h> |
| #include <zephyr/zbus/zbus.h> |
| LOG_MODULE_REGISTER(zbus, CONFIG_ZBUS_LOG_LEVEL); |
| |
| #if defined(CONFIG_ZBUS_PRIORITY_BOOST) |
| /* Available only when the priority boost is enabled */ |
| static struct k_spinlock _zbus_chan_slock; |
| #endif /* CONFIG_ZBUS_PRIORITY_BOOST */ |
| |
| static struct k_spinlock obs_slock; |
| |
| #if defined(CONFIG_ZBUS_MSG_SUBSCRIBER) |
| |
| #if defined(CONFIG_ZBUS_MSG_SUBSCRIBER_BUF_ALLOC_DYNAMIC) |
| |
| NET_BUF_POOL_HEAP_DEFINE(_zbus_msg_subscribers_pool, CONFIG_ZBUS_MSG_SUBSCRIBER_NET_BUF_POOL_SIZE, |
| sizeof(struct zbus_channel *), NULL); |
| BUILD_ASSERT(K_HEAP_MEM_POOL_SIZE > 0, "MSG_SUBSCRIBER feature requires heap memory pool."); |
| |
| static inline struct net_buf *_zbus_create_net_buf(struct net_buf_pool *pool, size_t size, |
| k_timeout_t timeout) |
| { |
| return net_buf_alloc_len(&_zbus_msg_subscribers_pool, size, timeout); |
| } |
| |
| #else |
| |
| NET_BUF_POOL_FIXED_DEFINE(_zbus_msg_subscribers_pool, |
| (CONFIG_ZBUS_MSG_SUBSCRIBER_NET_BUF_POOL_SIZE), |
| (CONFIG_ZBUS_MSG_SUBSCRIBER_NET_BUF_STATIC_DATA_SIZE), |
| sizeof(struct zbus_channel *), NULL); |
| |
| static inline struct net_buf *_zbus_create_net_buf(struct net_buf_pool *pool, size_t size, |
| k_timeout_t timeout) |
| { |
| __ASSERT(size <= CONFIG_ZBUS_MSG_SUBSCRIBER_NET_BUF_STATIC_DATA_SIZE, |
| "CONFIG_ZBUS_MSG_SUBSCRIBER_NET_BUF_STATIC_DATA_SIZE must be greater or equal to " |
| "%d", |
| (int)size); |
| return net_buf_alloc(&_zbus_msg_subscribers_pool, timeout); |
| } |
| #endif /* CONFIG_ZBUS_MSG_SUBSCRIBER_BUF_ALLOC_DYNAMIC */ |
| |
| #endif /* CONFIG_ZBUS_MSG_SUBSCRIBER */ |
| |
| int _zbus_init(void) |
| { |
| |
| const struct zbus_channel *curr = NULL; |
| const struct zbus_channel *prev = NULL; |
| |
| STRUCT_SECTION_FOREACH(zbus_channel_observation, observation) { |
| curr = observation->chan; |
| |
| if (prev != curr) { |
| if (prev == NULL) { |
| curr->data->observers_start_idx = 0; |
| curr->data->observers_end_idx = 0; |
| } else { |
| curr->data->observers_start_idx = prev->data->observers_end_idx; |
| curr->data->observers_end_idx = prev->data->observers_end_idx; |
| } |
| prev = curr; |
| } |
| |
| ++(curr->data->observers_end_idx); |
| } |
| STRUCT_SECTION_FOREACH(zbus_channel, chan) { |
| k_sem_init(&chan->data->sem, 1, 1); |
| |
| #if defined(CONFIG_ZBUS_RUNTIME_OBSERVERS) |
| sys_slist_init(&chan->data->observers); |
| #endif /* CONFIG_ZBUS_RUNTIME_OBSERVERS */ |
| } |
| return 0; |
| } |
| SYS_INIT(_zbus_init, APPLICATION, CONFIG_ZBUS_CHANNELS_SYS_INIT_PRIORITY); |
| |
| static inline int _zbus_notify_observer(const struct zbus_channel *chan, |
| const struct zbus_observer *obs, k_timepoint_t end_time, |
| struct net_buf *buf) |
| { |
| switch (obs->type) { |
| case ZBUS_OBSERVER_LISTENER_TYPE: { |
| obs->callback(chan); |
| break; |
| } |
| case ZBUS_OBSERVER_SUBSCRIBER_TYPE: { |
| return k_msgq_put(obs->queue, &chan, sys_timepoint_timeout(end_time)); |
| } |
| #if defined(CONFIG_ZBUS_MSG_SUBSCRIBER) |
| case ZBUS_OBSERVER_MSG_SUBSCRIBER_TYPE: { |
| struct net_buf *cloned_buf = net_buf_clone(buf, sys_timepoint_timeout(end_time)); |
| |
| if (cloned_buf == NULL) { |
| return -ENOMEM; |
| } |
| memcpy(net_buf_user_data(cloned_buf), &chan, sizeof(struct zbus_channel *)); |
| |
| net_buf_put(obs->message_fifo, cloned_buf); |
| |
| break; |
| } |
| #endif /* CONFIG_ZBUS_MSG_SUBSCRIBER */ |
| |
| default: |
| _ZBUS_ASSERT(false, "Unreachable"); |
| } |
| return 0; |
| } |
| |
| static inline int _zbus_vded_exec(const struct zbus_channel *chan, k_timepoint_t end_time) |
| { |
| int err = 0; |
| int last_error = 0; |
| struct net_buf *buf = NULL; |
| |
| /* Static observer event dispatcher logic */ |
| struct zbus_channel_observation *observation; |
| struct zbus_channel_observation_mask *observation_mask; |
| |
| #if defined(CONFIG_ZBUS_MSG_SUBSCRIBER) |
| buf = _zbus_create_net_buf(&_zbus_msg_subscribers_pool, zbus_chan_msg_size(chan), |
| sys_timepoint_timeout(end_time)); |
| |
| _ZBUS_ASSERT(buf != NULL, "net_buf zbus_msg_subscribers_pool is " |
| "unavailable or heap is full"); |
| |
| net_buf_add_mem(buf, zbus_chan_msg(chan), zbus_chan_msg_size(chan)); |
| #endif /* CONFIG_ZBUS_MSG_SUBSCRIBER */ |
| |
| LOG_DBG("Notifing %s's observers. Starting VDED:", _ZBUS_CHAN_NAME(chan)); |
| |
| int __maybe_unused index = 0; |
| |
| for (int16_t i = chan->data->observers_start_idx, limit = chan->data->observers_end_idx; |
| i < limit; ++i) { |
| STRUCT_SECTION_GET(zbus_channel_observation, i, &observation); |
| STRUCT_SECTION_GET(zbus_channel_observation_mask, i, &observation_mask); |
| |
| _ZBUS_ASSERT(observation != NULL, "observation must be not NULL"); |
| |
| const struct zbus_observer *obs = observation->obs; |
| |
| if (!obs->data->enabled || observation_mask->enabled) { |
| continue; |
| } |
| |
| err = _zbus_notify_observer(chan, obs, end_time, buf); |
| |
| if (err) { |
| last_error = err; |
| LOG_ERR("could not deliver notification to observer %s. Error code %d", |
| _ZBUS_OBS_NAME(obs), err); |
| if (err == -ENOMEM) { |
| if (IS_ENABLED(CONFIG_ZBUS_MSG_SUBSCRIBER)) { |
| net_buf_unref(buf); |
| } |
| return err; |
| } |
| } |
| |
| LOG_DBG(" %d -> %s", index++, _ZBUS_OBS_NAME(obs)); |
| } |
| |
| #if defined(CONFIG_ZBUS_RUNTIME_OBSERVERS) |
| /* Dynamic observer event dispatcher logic */ |
| struct zbus_observer_node *obs_nd, *tmp; |
| |
| SYS_SLIST_FOR_EACH_CONTAINER_SAFE(&chan->data->observers, obs_nd, tmp, node) { |
| |
| const struct zbus_observer *obs = obs_nd->obs; |
| |
| if (!obs->data->enabled) { |
| continue; |
| } |
| |
| err = _zbus_notify_observer(chan, obs, end_time, buf); |
| |
| if (err) { |
| last_error = err; |
| } |
| } |
| #endif /* CONFIG_ZBUS_RUNTIME_OBSERVERS */ |
| |
| IF_ENABLED(CONFIG_ZBUS_MSG_SUBSCRIBER, (net_buf_unref(buf);)) |
| |
| return last_error; |
| } |
| |
| #if defined(CONFIG_ZBUS_PRIORITY_BOOST) |
| |
| static inline void chan_update_hop(const struct zbus_channel *chan) |
| { |
| struct zbus_channel_observation *observation; |
| struct zbus_channel_observation_mask *observation_mask; |
| |
| int chan_highest_observer_priority = ZBUS_MIN_THREAD_PRIORITY; |
| |
| K_SPINLOCK(&_zbus_chan_slock) { |
| const int limit = chan->data->observers_end_idx; |
| |
| for (int16_t i = chan->data->observers_start_idx; i < limit; ++i) { |
| STRUCT_SECTION_GET(zbus_channel_observation, i, &observation); |
| STRUCT_SECTION_GET(zbus_channel_observation_mask, i, &observation_mask); |
| |
| __ASSERT(observation != NULL, "observation must be not NULL"); |
| |
| const struct zbus_observer *obs = observation->obs; |
| |
| if (!obs->data->enabled || observation_mask->enabled) { |
| continue; |
| } |
| |
| if (chan_highest_observer_priority > obs->data->priority) { |
| chan_highest_observer_priority = obs->data->priority; |
| } |
| } |
| chan->data->highest_observer_priority = chan_highest_observer_priority; |
| } |
| } |
| |
| static inline void update_all_channels_hop(const struct zbus_observer *obs) |
| { |
| struct zbus_channel_observation *observation; |
| |
| int count; |
| |
| STRUCT_SECTION_COUNT(zbus_channel_observation, &count); |
| |
| for (int16_t i = 0; i < count; ++i) { |
| STRUCT_SECTION_GET(zbus_channel_observation, i, &observation); |
| |
| if (obs != observation->obs) { |
| continue; |
| } |
| |
| chan_update_hop(observation->chan); |
| } |
| } |
| |
| int zbus_obs_attach_to_thread(const struct zbus_observer *obs) |
| { |
| _ZBUS_ASSERT(!k_is_in_isr(), "cannot attach to an ISR"); |
| _ZBUS_ASSERT(obs != NULL, "obs is required"); |
| |
| int current_thread_priority = k_thread_priority_get(k_current_get()); |
| |
| K_SPINLOCK(&obs_slock) { |
| if (obs->data->priority != current_thread_priority) { |
| obs->data->priority = current_thread_priority; |
| |
| update_all_channels_hop(obs); |
| } |
| } |
| |
| return 0; |
| } |
| |
| int zbus_obs_detach_from_thread(const struct zbus_observer *obs) |
| { |
| _ZBUS_ASSERT(!k_is_in_isr(), "cannot detach from an ISR"); |
| _ZBUS_ASSERT(obs != NULL, "obs is required"); |
| |
| K_SPINLOCK(&obs_slock) { |
| obs->data->priority = ZBUS_MIN_THREAD_PRIORITY; |
| |
| update_all_channels_hop(obs); |
| } |
| |
| return 0; |
| } |
| |
| #else |
| |
| static inline void update_all_channels_hop(const struct zbus_observer *obs) |
| { |
| } |
| |
| #endif /* CONFIG_ZBUS_PRIORITY_BOOST */ |
| |
| static inline int chan_lock(const struct zbus_channel *chan, k_timeout_t timeout, int *prio) |
| { |
| bool boosting = false; |
| |
| #if defined(CONFIG_ZBUS_PRIORITY_BOOST) |
| if (!k_is_in_isr()) { |
| *prio = k_thread_priority_get(k_current_get()); |
| |
| K_SPINLOCK(&_zbus_chan_slock) { |
| if (*prio > chan->data->highest_observer_priority) { |
| int new_prio = chan->data->highest_observer_priority - 1; |
| |
| new_prio = MAX(new_prio, 0); |
| |
| /* Elevating priority since the highest_observer_priority is |
| * greater than the current thread |
| */ |
| k_thread_priority_set(k_current_get(), new_prio); |
| |
| boosting = true; |
| } |
| } |
| } |
| #endif /* CONFIG_ZBUS_PRIORITY_BOOST */ |
| |
| int err = k_sem_take(&chan->data->sem, timeout); |
| |
| if (err) { |
| /* When the priority boost is disabled, this IF will be optimized out. */ |
| if (boosting) { |
| /* Restoring thread priority since the semaphore is not available */ |
| k_thread_priority_set(k_current_get(), *prio); |
| } |
| |
| return err; |
| } |
| |
| return 0; |
| } |
| |
| static inline void chan_unlock(const struct zbus_channel *chan, int prio) |
| { |
| k_sem_give(&chan->data->sem); |
| |
| #if defined(CONFIG_ZBUS_PRIORITY_BOOST) |
| /* During the unlock phase, with the priority boost enabled, the priority must be |
| * restored to the original value in case it was elevated |
| */ |
| if (prio < ZBUS_MIN_THREAD_PRIORITY) { |
| k_thread_priority_set(k_current_get(), prio); |
| } |
| #endif /* CONFIG_ZBUS_PRIORITY_BOOST */ |
| } |
| |
| int zbus_chan_pub(const struct zbus_channel *chan, const void *msg, k_timeout_t timeout) |
| { |
| int err; |
| |
| _ZBUS_ASSERT(chan != NULL, "chan is required"); |
| _ZBUS_ASSERT(msg != NULL, "msg is required"); |
| |
| if (k_is_in_isr()) { |
| timeout = K_NO_WAIT; |
| } |
| |
| k_timepoint_t end_time = sys_timepoint_calc(timeout); |
| |
| if (chan->validator != NULL && !chan->validator(msg, chan->message_size)) { |
| return -ENOMSG; |
| } |
| |
| int context_priority = ZBUS_MIN_THREAD_PRIORITY; |
| |
| err = chan_lock(chan, timeout, &context_priority); |
| if (err) { |
| return err; |
| } |
| |
| memcpy(chan->message, msg, chan->message_size); |
| |
| err = _zbus_vded_exec(chan, end_time); |
| |
| chan_unlock(chan, context_priority); |
| |
| return err; |
| } |
| |
| int zbus_chan_read(const struct zbus_channel *chan, void *msg, k_timeout_t timeout) |
| { |
| _ZBUS_ASSERT(chan != NULL, "chan is required"); |
| _ZBUS_ASSERT(msg != NULL, "msg is required"); |
| |
| if (k_is_in_isr()) { |
| timeout = K_NO_WAIT; |
| } |
| |
| int err = k_sem_take(&chan->data->sem, timeout); |
| if (err) { |
| return err; |
| } |
| |
| memcpy(msg, chan->message, chan->message_size); |
| |
| k_sem_give(&chan->data->sem); |
| |
| return 0; |
| } |
| |
| int zbus_chan_notify(const struct zbus_channel *chan, k_timeout_t timeout) |
| { |
| int err; |
| |
| _ZBUS_ASSERT(chan != NULL, "chan is required"); |
| |
| if (k_is_in_isr()) { |
| timeout = K_NO_WAIT; |
| } |
| |
| k_timepoint_t end_time = sys_timepoint_calc(timeout); |
| |
| int context_priority = ZBUS_MIN_THREAD_PRIORITY; |
| |
| err = chan_lock(chan, timeout, &context_priority); |
| if (err) { |
| return err; |
| } |
| |
| err = _zbus_vded_exec(chan, end_time); |
| |
| chan_unlock(chan, context_priority); |
| |
| return err; |
| } |
| |
| int zbus_chan_claim(const struct zbus_channel *chan, k_timeout_t timeout) |
| { |
| _ZBUS_ASSERT(chan != NULL, "chan is required"); |
| |
| if (k_is_in_isr()) { |
| timeout = K_NO_WAIT; |
| } |
| |
| int err = k_sem_take(&chan->data->sem, timeout); |
| |
| if (err) { |
| return err; |
| } |
| |
| return 0; |
| } |
| |
| int zbus_chan_finish(const struct zbus_channel *chan) |
| { |
| _ZBUS_ASSERT(chan != NULL, "chan is required"); |
| |
| k_sem_give(&chan->data->sem); |
| |
| return 0; |
| } |
| |
| int zbus_sub_wait(const struct zbus_observer *sub, const struct zbus_channel **chan, |
| k_timeout_t timeout) |
| { |
| _ZBUS_ASSERT(!k_is_in_isr(), "zbus_sub_wait cannot be used inside ISRs"); |
| _ZBUS_ASSERT(sub != NULL, "sub is required"); |
| _ZBUS_ASSERT(sub->type == ZBUS_OBSERVER_SUBSCRIBER_TYPE, "sub must be a SUBSCRIBER"); |
| _ZBUS_ASSERT(sub->queue != NULL, "sub queue is required"); |
| _ZBUS_ASSERT(chan != NULL, "chan is required"); |
| |
| return k_msgq_get(sub->queue, chan, timeout); |
| } |
| |
| #if defined(CONFIG_ZBUS_MSG_SUBSCRIBER) |
| |
| int zbus_sub_wait_msg(const struct zbus_observer *sub, const struct zbus_channel **chan, void *msg, |
| k_timeout_t timeout) |
| { |
| _ZBUS_ASSERT(!k_is_in_isr(), "zbus_sub_wait_msg cannot be used inside ISRs"); |
| _ZBUS_ASSERT(sub != NULL, "sub is required"); |
| _ZBUS_ASSERT(sub->type == ZBUS_OBSERVER_MSG_SUBSCRIBER_TYPE, |
| "sub must be a MSG_SUBSCRIBER"); |
| _ZBUS_ASSERT(sub->message_fifo != NULL, "sub message_fifo is required"); |
| _ZBUS_ASSERT(chan != NULL, "chan is required"); |
| _ZBUS_ASSERT(msg != NULL, "msg is required"); |
| |
| struct net_buf *buf = net_buf_get(sub->message_fifo, timeout); |
| |
| if (buf == NULL) { |
| return -ENOMSG; |
| } |
| |
| *chan = *((struct zbus_channel **)net_buf_user_data(buf)); |
| |
| memcpy(msg, net_buf_remove_mem(buf, zbus_chan_msg_size(*chan)), zbus_chan_msg_size(*chan)); |
| |
| net_buf_unref(buf); |
| |
| return 0; |
| } |
| |
| #endif /* CONFIG_ZBUS_MSG_SUBSCRIBER */ |
| |
| int zbus_obs_set_chan_notification_mask(const struct zbus_observer *obs, |
| const struct zbus_channel *chan, bool masked) |
| { |
| _ZBUS_ASSERT(obs != NULL, "obs is required"); |
| _ZBUS_ASSERT(chan != NULL, "chan is required"); |
| |
| int err = -ESRCH; |
| |
| struct zbus_channel_observation *observation; |
| struct zbus_channel_observation_mask *observation_mask; |
| |
| K_SPINLOCK(&obs_slock) { |
| for (int16_t i = chan->data->observers_start_idx, |
| limit = chan->data->observers_end_idx; |
| i < limit; ++i) { |
| STRUCT_SECTION_GET(zbus_channel_observation, i, &observation); |
| STRUCT_SECTION_GET(zbus_channel_observation_mask, i, &observation_mask); |
| |
| __ASSERT(observation != NULL, "observation must be not NULL"); |
| |
| if (observation->obs == obs) { |
| if (observation_mask->enabled != masked) { |
| observation_mask->enabled = masked; |
| |
| update_all_channels_hop(obs); |
| } |
| |
| err = 0; |
| |
| K_SPINLOCK_BREAK; |
| } |
| } |
| } |
| |
| return err; |
| } |
| |
| int zbus_obs_is_chan_notification_masked(const struct zbus_observer *obs, |
| const struct zbus_channel *chan, bool *masked) |
| { |
| _ZBUS_ASSERT(obs != NULL, "obs is required"); |
| _ZBUS_ASSERT(chan != NULL, "chan is required"); |
| |
| int err = -ESRCH; |
| |
| struct zbus_channel_observation *observation; |
| struct zbus_channel_observation_mask *observation_mask; |
| |
| K_SPINLOCK(&obs_slock) { |
| const int limit = chan->data->observers_end_idx; |
| |
| for (int16_t i = chan->data->observers_start_idx; i < limit; ++i) { |
| STRUCT_SECTION_GET(zbus_channel_observation, i, &observation); |
| STRUCT_SECTION_GET(zbus_channel_observation_mask, i, &observation_mask); |
| |
| __ASSERT(observation != NULL, "observation must be not NULL"); |
| |
| if (observation->obs == obs) { |
| *masked = observation_mask->enabled; |
| |
| err = 0; |
| |
| K_SPINLOCK_BREAK; |
| } |
| } |
| } |
| |
| return err; |
| } |
| |
| int zbus_obs_set_enable(struct zbus_observer *obs, bool enabled) |
| { |
| _ZBUS_ASSERT(obs != NULL, "obs is required"); |
| |
| K_SPINLOCK(&obs_slock) { |
| if (obs->data->enabled != enabled) { |
| obs->data->enabled = enabled; |
| |
| update_all_channels_hop(obs); |
| } |
| } |
| |
| return 0; |
| } |