| .. _zbus: |
| |
| Zephyr bus (zbus) |
| ################# |
| |
| .. |
| Note to documentation authors: the diagrams included in this documentation page were designed |
| using the following Figma library: |
| https://www.figma.com/community/file/1292866458780627559/zbus-diagram-assets |
| |
| |
| The :dfn:`Zephyr bus - zbus` is a lightweight and flexible software bus enabling a simple way for |
| threads to talk to one another in a many-to-many way. |
| |
| .. contents:: |
| :local: |
| :depth: 2 |
| |
| Concepts |
| ******** |
| Threads can send messages to one or more observers using zbus. It makes the many-to-many |
| communication possible. The bus implements message-passing and publish/subscribe communication |
| paradigms that enable threads to communicate synchronously or asynchronously through shared memory. |
| |
| The communication through zbus is channel-based. Threads (or callbacks) use channels to exchange |
| messages. Additionally, besides other actions, threads can publish and observe channels. When a |
| thread publishes a message on a channel, the bus will make the message available to all the |
| published channel's observers. Based on the observer's type, it can access the message directly, |
| receive a copy of it, or even receive only a reference of the published channel. |
| |
| The figure below shows an example of a typical application using zbus in which the application logic |
| (hardware independent) talks to other threads via software bus. Note that the threads are decoupled |
| from each other because they only use zbus channels and do not need to know each other to talk. |
| |
| |
| .. figure:: images/zbus_overview.svg |
| :alt: zbus usage overview |
| :width: 75% |
| |
| A typical zbus application architecture. |
| |
| The bus comprises: |
| |
| * Set of channels that consists of the control metadata information, and the message itself; |
| * :dfn:`Virtual Distributed Event Dispatcher` (VDED), the bus logic responsible for sending |
| notifications/messages to the observers. The VDED logic runs inside the publishing action in the same |
| thread context, giving the bus an idea of a distributed execution. When a thread publishes to a |
| channel, it also propagates the notifications to the observers; |
| * Threads (subscribers and message subscribers) and callbacks (listeners) publishing, reading, and |
| receiving notifications from the bus. |
| |
| .. figure:: images/zbus_anatomy.svg |
| :alt: ZBus anatomy |
| :width: 70% |
| |
| ZBus anatomy. |
| |
| The bus makes the publish, read, claim, finish, notify, and subscribe actions available over |
| channels. Publishing, reading, claiming, and finishing are available in all RTOS thread contexts, |
| including ISRs. The publish and read operations are simple and fast; the procedure is channel |
| locking followed by a memory copy to and from a shared memory region and then a channel unlocking. |
| Another essential aspect of zbus is the observers. There are three types of observers: |
| |
| .. figure:: images/zbus_type_of_observers.svg |
| :alt: ZBus observers type |
| :width: 70% |
| |
| ZBus observers. |
| |
| * Listeners, a callback that the event dispatcher executes every time an observed channel is |
| published or notified; |
| * Subscriber, a thread-based observer that relies internally on a message queue where the event |
| dispatcher puts a changed channel's reference every time an observed channel is published or |
| notified. Note this kind of observer does not receive the message itself. It should read the |
| message from the channel after receiving the notification; |
| * Message subscribers, a thread-based observer that relies internally on a FIFO where the event |
| dispatcher puts a copy of the message every time an observed channel is published or notified. |
| |
| Channel observation structures define the relationship between a channel and its observers. For |
| every observation, a pair channel/observer. Developers can statically allocate observation using the |
| :c:macro:`ZBUS_CHAN_DEFINE` or :c:macro:`ZBUS_CHAN_ADD_OBS`. There are also runtime observers, |
| enabling developers to create runtime observations. It is possible to disable an observer entirely |
| or observations individually. The event dispatcher will ignore disabled observers and observations. |
| |
| .. figure:: images/zbus_observation_mask.svg |
| :alt: ZBus observation mask. |
| :width: 75% |
| |
| ZBus observation mask. |
| |
| The above figure illustrates some states, from (a) to (d), for channels from ``C1`` to ``C5``, |
| ``Subscriber 1``, and the observations. The last two are in orange to indicate they are dynamically |
| allocated (runtime observation). (a) shows that the observer and all observations are enabled. (b) |
| shows the observer is disabled, so the event dispatcher will ignore it. (c) shows the observer |
| enabled. However, there is one static observervation disabled. The event dispatcher will only stop |
| sending notifications from channel ``C3``. In (d), the event dispatcher will stop sending |
| notifications from channels ``C3`` and ``C5`` to ``Subscriber 1``. |
| |
| |
| Suppose a usual sensor-based solution is in the figure below for illustration purposes. When |
| triggered, the timer publishes to the ``Trigger`` channel. As the sensor thread subscribed to the |
| ``Trigger`` channel, it receives the sensor data. Notice the VDED executes the ``Blink`` because it |
| also listens to the ``Trigger`` channel. When the sensor data is ready, the sensor thread publishes |
| it to the ``Sensor data`` channel. The core thread receives the message as a ``Sensor data`` channel |
| message subscriber, processes the sensor data, and stores it in an internal sample buffer. It |
| repeats until the sample buffer is full; when it happens, the core thread aggregates the sample |
| buffer information, prepares a package, and publishes that to the ``Payload`` channel. The Lora |
| thread receives that because it is a ``Payload`` channel message subscriber and sends the payload to |
| the cloud. When it completes the transmission, the Lora thread publishes to the ``Transmission |
| done`` channel. The VDED executes the ``Blink`` again since it listens to the ``Transmission done`` |
| channel. |
| |
| .. figure:: images/zbus_operations.svg |
| :alt: ZBus sensor-based application |
| :width: 85% |
| |
| ZBus sensor-based application. |
| |
| This way of implementing the solution makes the application more flexible, enabling us to change |
| things independently. For example, we want to change the trigger from a timer to a button press. We |
| can do that, and the change does not affect other parts of the system. Likewise, we would like to |
| change the communication interface from LoRa to Bluetooth; we only need to change the LoRa thread. |
| No other change is required in order to make that work. Thus, the developer would do that for every |
| block of the image. Based on that, there is a sign zbus promotes decoupling in the system |
| architecture. |
| |
| Another important aspect of using zbus is the reuse of system modules. If a code portion with |
| well-defined behaviors (we call that module) only uses zbus channels and not hardware interfaces, it |
| can easily be reused in other solutions. The new solution must implement the interfaces (set of |
| channels) the module needs to work. That indicates zbus could improve the module reuse. |
| |
| The last important note is the zbus solution reach. We can count on many ways of using zbus to |
| enable the developer to be as free as possible to create what they need. For example, messages can |
| be dynamic or static allocated; notifications can be synchronous or asynchronous; the developer can |
| control the channel in so many different ways claiming the channel, developers can add their |
| metadata information to a channel by using the user-data field, the discretionary use of a validator |
| enables the systems to be accurate over message format, and so on. Those characteristics increase |
| the solutions that can be done with zbus and make it a good fit as an open-source community tool. |
| |
| |
| .. _Virtual Distributed Event Dispatcher: |
| |
| Virtual Distributed Event Dispatcher |
| ==================================== |
| |
| The VDED execution always happens in the publisher's context. It can be a thread or an ISR. Be |
| careful with publications inside ISR because the scheduler won't preempt the VDED. Use that wisely. |
| The basic description of the execution is as follows: |
| |
| |
| * The channel lock is acquired; |
| * The channel receives the new message via direct copy (by a raw :c:func:`memcpy`); |
| * The event dispatcher logic executes the listeners, sends a copy of the message to the message |
| subscribers, and pushes the channel's reference to the subscribers' notification message queue in |
| the same sequence they appear on the channel observers' list. The listeners can perform non-copy |
| quick access to the constant message reference directly (via the :c:func:`zbus_chan_const_msg` |
| function) since the channel is still locked; |
| * At last, the publishing function unlocks the channel. |
| |
| |
| To illustrate the VDED execution, consider the example illustrated below. We have four threads in |
| ascending priority ``S1``, ``MS2``, ``MS1``, and ``T1`` (the highest priority); two listeners, |
| ``L1`` and ``L2``; and channel A. Supposing ``L1``, ``L2``, ``MS1``, ``MS2``, and ``S1`` observer |
| channel A. |
| |
| .. figure:: images/zbus_publishing_process_example_scenario.svg |
| :alt: ZBus example scenario |
| :width: 45% |
| |
| ZBus VDED execution example scenario. |
| |
| |
| The following code implements channel A. Note the ``struct a_msg`` is illustrative only. |
| |
| .. code-block:: c |
| |
| ZBUS_CHAN_DEFINE(a_chan, /* Name */ |
| struct a_msg, /* Message type */ |
| |
| NULL, /* Validator */ |
| NULL, /* User Data */ |
| ZBUS_OBSERVERS(L1, L2, MS1, MS2, S1), /* observers */ |
| ZBUS_MSG_INIT(0) /* Initial value {0} */ |
| ); |
| |
| |
| In the figure below, the letters indicate some action related to the VDED execution. The X-axis |
| represents the time, and the Y-axis represents the priority of threads. Channel A's message, |
| represented by a voice balloon, is only one memory portion (shared memory). It appears several times |
| only as an illustration of the message at that point in time. |
| |
| |
| .. figure:: images/zbus_publishing_process_example.svg |
| :alt: ZBus publish processing detail |
| :width: 85% |
| |
| ZBus VDED execution detail for priority T1 > MS1 > MS2 > S1. |
| |
| |
| |
| The figure above illustrates the actions performed during the VDED execution when T1 publishes to |
| channel A. Thus, the table below describes the activities (represented by a letter) of the VDED |
| execution. The scenario considers the following priorities: T1 > MS1 > MS2 > S1. T1 has the highest |
| priority. |
| |
| |
| .. list-table:: VDED execution steps in detail for priority T1 > MS1 > MS2 > S1. |
| :widths: 5 65 |
| :header-rows: 1 |
| |
| * - Actions |
| - Description |
| * - a |
| - T1 starts and, at some point, publishes to channel A. |
| * - b |
| - The publishing (VDED) process starts. The VDED locks the channel A. |
| * - c |
| - The VDED copies the T1 message to the channel A message. |
| |
| * - d, e |
| - The VDED executes L1 and L2 in the respective sequence. Inside the listeners, usually, there |
| is a call to the :c:func:`zbus_chan_const_msg` function, which provides a direct constant |
| reference to channel A's message. It is quick, and no copy is needed here. |
| |
| * - f, g |
| - The VDED copies the message and sends that to MS1 and MS2 sequentially. Notice the threads |
| get ready to execute right after receiving the notification. However, they go to a pending |
| state because they have less priority than T1. |
| * - h |
| - The VDED pushes the notification message to the queue of S1. Notice the thread gets ready to |
| execute right after receiving the notification. However, it goes to a pending state because |
| it cannot access the channel since it is still locked. |
| |
| * - i |
| - VDED finishes the publishing by unlocking channel A. The MS1 leaves the pending state and |
| starts executing. |
| |
| * - j |
| - MS1 finishes execution. The MS2 leaves the pending state and starts executing. |
| |
| * - k |
| - MS2 finishes execution. The S1 leaves the pending state and starts executing. |
| |
| * - l, m, n |
| - The S1 leaves the pending state since channel A is not locked. It gets in the CPU again and |
| starts executing. As it did receive a notification from channel A, it performed a channel read |
| (as simple as lock, memory copy, unlock), continues its execution and goes out of the CPU. |
| |
| * - o |
| - S1 finishes its workload. |
| |
| |
| The figure below illustrates the actions performed during the VDED execution when T1 publishes to |
| channel A. The scenario considers the following priorities: T1 < MS1 < MS2 < S1. |
| |
| .. figure:: images/zbus_publishing_process_example2.svg |
| :alt: ZBus publish processing detail |
| :width: 85% |
| |
| ZBus VDED execution detail for priority T1 < MS1 < MS2 < S1. |
| |
| Thus, the table below describes the activities (represented by a letter) of the VDED execution. |
| |
| .. list-table:: VDED execution steps in detail for priority T1 < MS1 < MS2 < S1. |
| :widths: 5 65 |
| :header-rows: 1 |
| |
| * - Actions |
| - Description |
| * - a |
| - T1 starts and, at some point, publishes to channel A. |
| * - b |
| - The publishing (VDED) process starts. The VDED locks the channel A. |
| * - c |
| - The VDED copies the T1 message to the channel A message. |
| |
| * - d, e |
| - The VDED executes L1 and L2 in the respective sequence. Inside the listeners, usually, there |
| is a call to the :c:func:`zbus_chan_const_msg` function, which provides a direct constant |
| reference to channel A's message. It is quick, and no copy is needed here. |
| |
| * - f |
| - The VDED copies the message and sends that to MS1. MS1 preempts T1 and starts working. |
| After that, the T1 regain MCU. |
| |
| * - g |
| - The VDED copies the message and sends that to MS2. MS2 preempts T1 and starts working. |
| After that, the T1 regain MCU. |
| |
| * - h |
| - The VDED pushes the notification message to the queue of S1. |
| |
| * - i |
| - VDED finishes the publishing by unlocking channel A. |
| |
| * - j, k, l |
| - The S1 leaves the pending state since channel A is not locked. It gets in the CPU again and |
| starts executing. As it did receive a notification from channel A, it performs a channel read |
| (as simple as lock, memory copy, unlock), continues its execution, and goes out the CPU. |
| |
| |
| HLP priority boost |
| ------------------ |
| ZBus implements the Highest Locker Protocol that relies on the observers' thread priority to |
| determine a temporary publisher priority. The protocol considers the channel's Highest Observer |
| Priority (HOP); even if the observer is not waiting for a message on the channel, it is considered |
| in the calculation. The VDED will elevate the publisher's priority based on the HOP to ensure small |
| latency and as few preemptions as possible. |
| |
| .. note:: |
| The priority boost is enabled by default. To deactivate it, you must set the |
| :kconfig:option:`CONFIG_ZBUS_PRIORITY_BOOST` configuration. |
| |
| .. warning:: |
| ZBus priority boost does not consider runtime observers on the HOP calculations. |
| |
| The figure below illustrates the actions performed during the VDED execution when T1 publishes to |
| channel A. The scenario considers the priority boost feature and the following priorities: T1 < MS1 |
| < MS2 < S1. |
| |
| .. figure:: images/zbus_publishing_process_example_HLP.svg |
| :alt: ZBus publishing process details using priority boost. |
| :width: 85% |
| |
| ZBus VDED execution detail with priority boost enabled and for priority T1 < MS1 < MS2 < S1. |
| |
| To properly use the priority boost, attaching the observer to a thread is necessary. When the |
| subscriber is attached to a thread, it assumes its priority, and the priority boost algorithm will |
| consider the observer's priority. The following code illustrates the thread-attaching function. |
| |
| |
| .. code-block:: c |
| :emphasize-lines: 10 |
| |
| ZBUS_SUBSCRIBER_DEFINE(s1, 4); |
| void s1_thread(void *ptr1, void *ptr2, void *ptr3) |
| { |
| ARG_UNUSED(ptr1); |
| ARG_UNUSED(ptr2); |
| ARG_UNUSED(ptr3); |
| |
| const struct zbus_channel *chan; |
| |
| zbus_obs_attach_to_thread(&s1); |
| |
| while (1) { |
| zbus_sub_wait(&s1, &chan, K_FOREVER); |
| |
| /* Subscriber implementation */ |
| |
| } |
| } |
| K_THREAD_DEFINE(s1_id, CONFIG_MAIN_STACK_SIZE, s1_thread, NULL, NULL, NULL, 2, 0, 0); |
| |
| On the above code, the :c:func:`zbus_obs_attach_to_thread` will set the ``s1`` observer with |
| priority two as the thread has that priority. It is possible to reverse that by detaching the |
| observer using the :c:func:`zbus_obs_detach_from_thread`. Only enabled observers and observations |
| will be considered on the channel HOP calculation. Masking a specific observation of a channel will |
| affect the channel HOP. |
| |
| In summary, the benefits of the feature are: |
| |
| * The HLP is more effective for zbus than the mutexes priority inheritance; |
| * No bounded priority inversion will happen among the publisher and the observers; |
| * No other threads (that are not involved in the communication) with priority between T1 and S1 can |
| preempt T1, avoiding unbounded priority inversion; |
| * Message subscribers will wait for the VDED to finish the message delivery process. So the VDED |
| execution will be faster and more consistent; |
| * The HLP priority is dynamic and can change in execution; |
| * ZBus operations can be used inside ISRs; |
| * The priority boosting feature can be turned off, and plain semaphores can be used as the channel |
| lock mechanism; |
| * The Highest Locker Protocol's major disadvantage, the Inheritance-related Priority Inversion, is |
| acceptable in the zbus scenario since it will ensure a small bus latency. |
| |
| |
| Limitations |
| =========== |
| |
| Based on the fact that developers can use zbus to solve many different problems, some challenges |
| arise. ZBus will not solve every problem, so it is necessary to analyze the situation to be sure |
| zbus is applicable. For instance, based on the zbus benchmark, it would not be well suited to a |
| high-speed stream of bytes between threads. The `Pipe` kernel object solves this kind of need. |
| |
| Delivery guarantees |
| ------------------- |
| |
| ZBus always delivers the messages to the listeners and message subscribers. However, there are no |
| message delivery guarantees for subscribers because zbus only sends the notification, but the |
| message reading depends on the subscriber's implementation. It is possible to increase the delivery |
| rate by following design tips: |
| |
| * Keep the listeners quick-as-possible (deal with them as ISRs). If some processing is needed, |
| consider submitting a work item to a work-queue; |
| * Try to give producers a high priority to avoid losses; |
| * Leave spare CPU for observers to consume data produced; |
| * Consider using message queues or pipes for intensive byte transfers. |
| |
| .. warning:: |
| ZBus uses :zephyr_file:`include/zephyr/net/buf.h` (network buffers) to exchange data with message |
| subscribers. So, chose carefully the configurations |
| :kconfig:option:`CONFIG_ZBUS_MSG_SUBSCRIBER_NET_BUF_POOL_SIZE` and |
| :kconfig:option:`CONFIG_HEAP_MEM_POOL_SIZE`. They are crucial to a proper VDED execution |
| (delivery guarantee) considering message subscribers. |
| |
| .. warning:: |
| Subscribers will receive only the reference of the changing channel. A data loss may be perceived |
| if the channel is published twice before the subscriber reads it. The second publication |
| overwrites the value from the first. Thus, the subscriber will receive two notifications, but |
| only the last data is there. |
| |
| |
| |
| .. _zbus delivery sequence: |
| |
| Message delivery sequence |
| ------------------------- |
| |
| The message delivery will follow the precedence: |
| |
| #. Observers defined in a channel using the :c:macro:`ZBUS_CHAN_DEFINE` (following the definition |
| sequence); |
| #. Observers defined using the :c:macro:`ZBUS_CHAN_ADD_OBS` based on the sequence priority |
| (parameter of the macro); |
| #. The latest is the runtime observers in the addition sequence using the |
| :c:func:`zbus_chan_add_obs`. |
| |
| .. note:: |
| The VDED will ignore all disabled observers or observations. |
| |
| Usage |
| ***** |
| |
| ZBus operation depends on channels and observers. Therefore, it is necessary to determine its |
| message and observers list during the channel definition. A message is a regular C struct; the |
| observer can be a subscriber (asynchronous), a message subscriber (asynchronous), or a listener |
| (synchronous). |
| |
| The following code defines and initializes a regular channel and its dependencies. This channel |
| exchanges accelerometer data, for example. |
| |
| .. code-block:: c |
| |
| struct acc_msg { |
| int x; |
| int y; |
| int z; |
| }; |
| |
| ZBUS_CHAN_DEFINE(acc_chan, /* Name */ |
| struct acc_msg, /* Message type */ |
| |
| NULL, /* Validator */ |
| NULL, /* User Data */ |
| ZBUS_OBSERVERS(my_listener, my_subscriber, |
| my_msg_subscriber), /* observers */ |
| ZBUS_MSG_INIT(.x = 0, .y = 0, .z = 0) /* Initial value */ |
| ); |
| |
| void listener_callback_example(const struct zbus_channel *chan) |
| { |
| const struct acc_msg *acc; |
| if (&acc_chan == chan) { |
| acc = zbus_chan_const_msg(chan); // Direct message access |
| LOG_DBG("From listener -> Acc x=%d, y=%d, z=%d", acc->x, acc->y, acc->z); |
| } |
| } |
| |
| ZBUS_LISTENER_DEFINE(my_listener, listener_callback_example); |
| |
| ZBUS_LISTENER_DEFINE(my_listener2, listener_callback_example); |
| |
| ZBUS_CHAN_ADD_OBS(acc_chan, my_listener2, 3); |
| |
| ZBUS_SUBSCRIBER_DEFINE(my_subscriber, 4); |
| void subscriber_task(void) |
| { |
| const struct zbus_channel *chan; |
| |
| while (!zbus_sub_wait(&my_subscriber, &chan, K_FOREVER)) { |
| struct acc_msg acc = {0}; |
| |
| if (&acc_chan == chan) { |
| // Indirect message access |
| zbus_chan_read(&acc_chan, &acc, K_NO_WAIT); |
| LOG_DBG("From subscriber -> Acc x=%d, y=%d, z=%d", acc.x, acc.y, acc.z); |
| } |
| } |
| } |
| K_THREAD_DEFINE(subscriber_task_id, 512, subscriber_task, NULL, NULL, NULL, 3, 0, 0); |
| |
| ZBUS_MSG_SUBSCRIBER_DEFINE(my_msg_subscriber); |
| static void msg_subscriber_task(void *ptr1, void *ptr2, void *ptr3) |
| { |
| ARG_UNUSED(ptr1); |
| ARG_UNUSED(ptr2); |
| ARG_UNUSED(ptr3); |
| const struct zbus_channel *chan; |
| |
| struct acc_msg acc = {0}; |
| |
| while (!zbus_sub_wait_msg(&my_msg_subscriber, &chan, &acc, K_FOREVER)) { |
| if (&acc_chan == chan) { |
| LOG_INF("From msg subscriber -> Acc x=%d, y=%d, z=%d", acc.x, acc.y, acc.z); |
| } |
| } |
| } |
| K_THREAD_DEFINE(msg_subscriber_task_id, 1024, msg_subscriber_task, NULL, NULL, NULL, 3, 0, 0); |
| |
| |
| |
| It is possible to add static observers to a channel using the :c:macro:`ZBUS_CHAN_ADD_OBS`. We call |
| that a post-definition static observer. The command enables us to indicate an initialization |
| priority that affects the observers' initialization order. The sequence priority param only affects |
| the post-definition static observers. There is no possibility to overwrite the message delivery |
| sequence of the static observers. |
| |
| .. note:: |
| It is unnecessary to claim/lock a channel before accessing the message inside the listener since |
| the event dispatcher calls listeners with the notifying channel already locked. Subscribers, |
| however, must claim/lock that or use regular read operations to access the message after being |
| notified. |
| |
| |
| Channels can have a `validator function` that enables a channel to accept only valid messages. |
| Publish attempts invalidated by hard channels will return immediately with an error code. This |
| allows original creators of a channel to exert some authority over other developers/publishers who |
| may want to piggy-back on their channels. The following code defines and initializes a :dfn:`hard |
| channel` and its dependencies. Only valid messages can be published to a :dfn:`hard channel`. It is |
| possible because a `validator function` was passed to the channel's definition. In this example, |
| only messages with ``move`` equal to 0, -1, and 1 are valid. Publish function will discard all other |
| values to ``move``. |
| |
| .. code-block:: c |
| |
| struct control_msg { |
| int move; |
| }; |
| |
| bool control_validator(const void* msg, size_t msg_size) { |
| const struct control_msg* cm = msg; |
| bool is_valid = (cm->move == -1) || (cm->move == 0) || (cm->move == 1); |
| return is_valid; |
| } |
| |
| static int message_count = 0; |
| |
| ZBUS_CHAN_DEFINE(control_chan, /* Name */ |
| struct control_msg, /* Message type */ |
| |
| control_validator, /* Validator */ |
| &message_count, /* User data */ |
| ZBUS_OBSERVERS_EMPTY, /* observers */ |
| ZBUS_MSG_INIT(.move = 0) /* Initial value */ |
| ); |
| |
| The following sections describe in detail how to use zbus features. |
| |
| |
| .. _publishing to a channel: |
| |
| Publishing to a channel |
| ======================= |
| |
| Messages are published to a channel in zbus by calling :c:func:`zbus_chan_pub`. For example, the |
| following code builds on the examples above and publishes to channel ``acc_chan``. The code is |
| trying to publish the message ``acc1`` to channel ``acc_chan``, and it will wait up to one second |
| for the message to be published. Otherwise, the operation fails. As can be inferred from the code |
| sample, it's OK to use stack allocated messages since VDED copies the data internally. |
| |
| .. code-block:: c |
| |
| struct acc_msg acc1 = {.x = 1, .y = 1, .z = 1}; |
| zbus_chan_pub(&acc_chan, &acc1, K_SECONDS(1)); |
| |
| .. warning:: |
| Only use this function inside an ISR with a :c:macro:`K_NO_WAIT` timeout. |
| |
| .. _reading from a channel: |
| |
| Reading from a channel |
| ====================== |
| |
| Messages are read from a channel in zbus by calling :c:func:`zbus_chan_read`. So, for example, the |
| following code tries to read the channel ``acc_chan``, which will wait up to 500 milliseconds to |
| read the message. Otherwise, the operation fails. |
| |
| .. code-block:: c |
| |
| struct acc_msg acc = {0}; |
| zbus_chan_read(&acc_chan, &acc, K_MSEC(500)); |
| |
| .. warning:: |
| Only use this function inside an ISR with a :c:macro:`K_NO_WAIT` timeout. |
| |
| .. warning:: |
| Choose the timeout of :c:func:`zbus_chan_read` after receiving a notification from |
| :c:func:`zbus_sub_wait` carefully because the channel will always be unavailable during the VDED |
| execution. Using ``K_NO_WAIT`` for reading is highly likely to return a timeout error if there |
| are more than one subscriber. For example, consider the VDED illustration again and notice how |
| ``S1`` read attempts would definitely fail with K_NO_WAIT. For more details, check |
| the `Virtual Distributed Event Dispatcher`_ section. |
| |
| Notifying a channel |
| =================== |
| |
| It is possible to force zbus to notify a channel's observers by calling :c:func:`zbus_chan_notify`. |
| For example, the following code builds on the examples above and forces a notification for the |
| channel ``acc_chan``. Note this can send events with no message, which does not require any data |
| exchange. See the code example under `Claim and finish a channel`_ where this may become useful. |
| |
| .. code-block:: c |
| |
| zbus_chan_notify(&acc_chan, K_NO_WAIT); |
| |
| .. warning:: |
| Only use this function inside an ISR with a :c:macro:`K_NO_WAIT` timeout. |
| |
| Declaring channels and observers |
| ================================ |
| |
| For accessing channels or observers from files other than its defining files, it is necessary to |
| declare them by calling :c:macro:`ZBUS_CHAN_DECLARE` and :c:macro:`ZBUS_OBS_DECLARE`. In other |
| words, zbus channel definitions and declarations with the same channel names in different files |
| would point to the same (global) channel. Thus, developers should be careful about existing |
| channels, and naming new channels or linking will fail. It is possible to declare more than one |
| channel or observer on the same call. The following code builds on the examples above and displays |
| the defined channels and observers. |
| |
| .. code-block:: c |
| |
| ZBUS_OBS_DECLARE(my_listener, my_subscriber); |
| ZBUS_CHAN_DECLARE(acc_chan, version_chan); |
| |
| |
| Iterating over channels and observers |
| ===================================== |
| |
| ZBus subsystem also implements :ref:`Iterable Sections <iterable_sections_api>` for channels and |
| observers, for which there are supporting APIs like :c:func:`zbus_iterate_over_channels`, |
| :c:func:`zbus_iterate_over_channels_with_user_data`, :c:func:`zbus_iterate_over_observers` and |
| :c:func:`zbus_iterate_over_observers_with_user_data`. This feature enables developers to call a |
| procedure over all declared channels, where the procedure parameter is a :c:struct:`zbus_channel`. |
| The execution sequence is in the alphabetical name order of the channels (see :ref:`Iterable |
| Sections <iterable_sections_api>` documentation for details). ZBus also implements this feature for |
| :c:struct:`zbus_observer`. |
| |
| .. code-block:: c |
| |
| static bool print_channel_data_iterator(const struct zbus_channel *chan, void *user_data) |
| { |
| int *count = user_data; |
| |
| LOG_INF("%d - Channel %s:", *count, zbus_chan_name(chan)); |
| LOG_INF(" Message size: %d", zbus_chan_msg_size(chan)); |
| LOG_INF(" Observers:"); |
| |
| ++(*count); |
| |
| struct zbus_channel_observation *observation; |
| |
| for (int16_t i = *chan->observers_start_idx, limit = *chan->observers_end_idx; i < limit; |
| ++i) { |
| STRUCT_SECTION_GET(zbus_channel_observation, i, &observation); |
| |
| LOG_INF(" - %s", observation->obs->name); |
| } |
| |
| struct zbus_observer_node *obs_nd, *tmp; |
| |
| SYS_SLIST_FOR_EACH_CONTAINER_SAFE(chan->observers, obs_nd, tmp, node) { |
| LOG_INF(" - %s", obs_nd->obs->name); |
| } |
| |
| return true; |
| } |
| |
| static bool print_observer_data_iterator(const struct zbus_observer *obs, void *user_data) |
| { |
| int *count = user_data; |
| |
| LOG_INF("%d - %s %s", *count, obs->queue ? "Subscriber" : "Listener", zbus_obs_name(obs)); |
| |
| ++(*count); |
| |
| return true; |
| } |
| |
| int main(void) |
| { |
| int count = 0; |
| |
| LOG_INF("Channel list:"); |
| |
| zbus_iterate_over_channels_with_user_data(print_channel_data_iterator, &count); |
| |
| count = 0; |
| |
| LOG_INF("Observers list:"); |
| |
| zbus_iterate_over_observers_with_user_data(print_observer_data_iterator, &count); |
| |
| return 0; |
| } |
| |
| |
| The code will log the following output: |
| |
| .. code-block:: console |
| |
| D: Channel list: |
| D: 0 - Channel acc_chan: |
| D: Message size: 12 |
| D: Observers: |
| D: - my_listener |
| D: - my_subscriber |
| D: 1 - Channel version_chan: |
| D: Message size: 4 |
| D: Observers: |
| D: Observers list: |
| D: 0 - Listener my_listener |
| D: 1 - Subscriber my_subscriber |
| |
| |
| .. _Claim and finish a channel: |
| |
| Advanced channel control |
| ======================== |
| |
| ZBus was designed to be as flexible and extensible as possible. Thus, there are some features |
| designed to provide some control and extensibility to the bus. |
| |
| Listeners message access |
| ------------------------ |
| |
| For performance purposes, listeners can access the receiving channel message directly since they |
| already have the channel locked for it. To access the channel's message, the listener should use the |
| :c:func:`zbus_chan_const_msg` because the channel passed as an argument to the listener function is |
| a constant pointer to the channel. The const pointer return type tells developers not to modify the |
| message. |
| |
| .. code-block:: c |
| |
| void listener_callback_example(const struct zbus_channel *chan) |
| { |
| const struct acc_msg *acc; |
| if (&acc_chan == chan) { |
| acc = zbus_chan_const_msg(chan); // Use this |
| // instead of zbus_chan_read(chan, &acc, K_MSEC(200)) |
| // or zbus_chan_msg(chan) |
| |
| LOG_DBG("From listener -> Acc x=%d, y=%d, z=%d", acc->x, acc->y, acc->z); |
| } |
| } |
| |
| User Data |
| --------- |
| It is possible to pass custom data into the channel's ``user_data`` for various purposes, such as |
| writing channel metadata. That can be achieved by passing a pointer to the channel definition |
| macro's ``user_data`` field, which will then be accessible by others. Note that ``user_data`` is |
| individual for each channel. Also, note that ``user_data`` access is not thread-safe. For |
| thread-safe access to ``user_data``, see the next section. |
| |
| |
| Claim and finish a channel |
| -------------------------- |
| |
| To take more control over channels, two functions were added :c:func:`zbus_chan_claim` and |
| :c:func:`zbus_chan_finish`. With these functions, it is possible to access the channel's metadata |
| safely. When a channel is claimed, no actions are available to that channel. After finishing the |
| channel, all the actions are available again. |
| |
| .. warning:: |
| Never change the fields of the channel struct directly. It may cause zbus behavior |
| inconsistencies and scheduling issues. |
| |
| .. warning:: |
| Only use this function inside an ISR with a :c:macro:`K_NO_WAIT` timeout. |
| |
| The following code builds on the examples above and claims the ``acc_chan`` to set the ``user_data`` |
| to the channel. Suppose we would like to count how many times the channels exchange messages. We |
| defined the ``user_data`` to have the 32 bits integer. This code could be added to the listener code |
| described above. |
| |
| .. code-block:: c |
| |
| if (!zbus_chan_claim(&acc_chan, K_MSEC(200))) { |
| int *message_counting = (int *) zbus_chan_user_data(&acc_chan); |
| *message_counting += 1; |
| zbus_chan_finish(&acc_chan); |
| } |
| |
| The following code has the exact behavior of the code in :ref:`publishing to a channel`. |
| |
| .. code-block:: c |
| |
| if (!zbus_chan_claim(&acc_chan, K_MSEC(200))) { |
| struct acc_msg *acc1 = (struct acc_msg *) zbus_chan_msg(&acc_chan); |
| acc1.x = 1; |
| acc1.y = 1; |
| acc1.z = 1; |
| zbus_chan_finish(&acc_chan); |
| zbus_chan_notify(&acc_chan, K_SECONDS(1)); |
| } |
| |
| The following code has the exact behavior of the code in :ref:`reading from a channel`. |
| |
| .. code-block:: c |
| |
| if (!zbus_chan_claim(&acc_chan, K_MSEC(200))) { |
| const struct acc_msg *acc1 = (const struct acc_msg *) zbus_chan_const_msg(&acc_chan); |
| // access the acc_msg fields directly. |
| zbus_chan_finish(&acc_chan); |
| } |
| |
| |
| Runtime observer registration |
| ----------------------------- |
| |
| It is possible to add observers to channels in runtime. This feature uses the heap to allocate the |
| nodes dynamically. The heap size limits the number of dynamic observers zbus can create. Therefore, |
| set the :kconfig:option:`CONFIG_ZBUS_RUNTIME_OBSERVERS` to enable the feature. It is possible to |
| adjust the heap size by changing the configuration :kconfig:option:`CONFIG_HEAP_MEM_POOL_SIZE`. The |
| following example illustrates the runtime registration usage. |
| |
| |
| |
| .. code-block:: c |
| |
| ZBUS_LISTENER_DEFINE(my_listener, callback); |
| // ... |
| void thread_entry(void) { |
| // ... |
| /* Adding the observer to channel chan1 */ |
| zbus_chan_add_obs(&chan1, &my_listener, K_NO_WAIT); |
| /* Removing the observer from channel chan1 */ |
| zbus_chan_rm_obs(&chan1, &my_listener, K_NO_WAIT); |
| |
| |
| Samples |
| ******* |
| |
| For a complete overview of zbus usage, take a look at the samples. There are the following samples |
| available: |
| |
| * :zephyr:code-sample:`zbus-hello-world` illustrates the code used above in action; |
| * :zephyr:code-sample:`zbus-work-queue` shows how to define and use different kinds of observers. |
| Note there is an example of using a work queue instead of executing the listener as an execution |
| option; |
| * :zephyr:code-sample:`zbus-msg-subscriber` illustrates how to use message subscribers; |
| * :zephyr:code-sample:`zbus-dyn-channel` demonstrates how to use dynamically allocated exchanging |
| data in zbus; |
| * :zephyr:code-sample:`zbus-uart-bridge` shows an example of sending the operation of the channel to |
| a host via serial; |
| * :zephyr:code-sample:`zbus-remote-mock` illustrates how to implement an external mock (on the host) |
| to send and receive messages to and from the bus; |
| * :zephyr:code-sample:`zbus-priority-boost` illustrates zbus priority boost feature with a priority |
| inversion scenario; |
| * :zephyr:code-sample:`zbus-runtime-obs-registration` illustrates a way of using the runtime |
| observer registration feature; |
| * :zephyr:code-sample:`zbus-confirmed-channel` implements a way of implement confirmed channel only |
| with subscribers; |
| * :zephyr:code-sample:`zbus-benchmark` implements a benchmark with different combinations of inputs. |
| |
| Suggested Uses |
| ************** |
| |
| Use zbus to transfer data (messages) between threads in one-to-one, one-to-many, and many-to-many |
| synchronously or asynchronously. Choosing the proper observer type is crucial. Use subscribers for |
| scenarios that can tolerate message losses and duplications; when they cannot, use message |
| subscribers (if you need a thread) or listeners (if you need to be lean and fast). In addition to |
| the listener, another asynchronous message processing mechanism (like :ref:`message queues |
| <message_queues_v2>`) may be necessary to retain the pending message until it gets processed. |
| |
| .. note:: |
| ZBus can be used to transfer streams from the producer to the consumer. However, this can |
| increase zbus' communication latency. So maybe consider a Pipe a good alternative for this |
| communication topology. |
| |
| Configuration Options |
| ********************* |
| |
| For enabling zbus, it is necessary to enable the :kconfig:option:`CONFIG_ZBUS` option. |
| |
| Related configuration options: |
| |
| * :kconfig:option:`CONFIG_ZBUS_PRIORITY_BOOST` zbus Highest Locker Protocol implementation; |
| * :kconfig:option:`CONFIG_ZBUS_CHANNELS_SYS_INIT_PRIORITY` determine the :c:macro:`SYS_INIT` |
| priority used by zbus to organize the channels observations by channel; |
| * :kconfig:option:`CONFIG_ZBUS_CHANNEL_NAME` enables the name of channels to be available inside the |
| channels metadata. The log uses this information to show the channels' names; |
| * :kconfig:option:`CONFIG_ZBUS_OBSERVER_NAME` enables the name of observers to be available inside |
| the channels metadata; |
| * :kconfig:option:`CONFIG_ZBUS_MSG_SUBSCRIBER` enables the message subscriber observer type; |
| * :kconfig:option:`CONFIG_ZBUS_MSG_SUBSCRIBER_BUF_ALLOC_DYNAMIC` uses the heap to allocate message |
| buffers; |
| * :kconfig:option:`CONFIG_ZBUS_MSG_SUBSCRIBER_BUF_ALLOC_STATIC` uses the stack to allocate message |
| buffers; |
| * :kconfig:option:`CONFIG_ZBUS_MSG_SUBSCRIBER_NET_BUF_POOL_SIZE` the available number of message |
| buffers to be used simultaneously; |
| * :kconfig:option:`CONFIG_ZBUS_MSG_SUBSCRIBER_NET_BUF_STATIC_DATA_SIZE` the biggest message of zbus |
| channels to be transported into a message buffer; |
| * :kconfig:option:`CONFIG_ZBUS_RUNTIME_OBSERVERS` enables the runtime observer registration. |
| |
| API Reference |
| ************* |
| |
| .. doxygengroup:: zbus_apis |