| .. _pipes_v2: |
| |
| Pipes |
| ##### |
| |
| A :dfn:`pipe` is a kernel object that allows a thread to send a byte stream |
| to another thread. Pipes enable efficient inter-thread communication and can |
| be used to synchronously transfer chunks of data in whole or in part. |
| |
| .. contents:: |
| :local: |
| :depth: 2 |
| |
| Concepts |
| ******** |
| |
| Any number of pipes can be defined, limited only by available RAM. Each pipe |
| is referenced by its memory address. |
| |
| A pipe has the following key property: |
| |
| * A **size** that indicates the capacity of the pipe's ring buffer. Note that a |
| size of zero defines a pipe with no ring buffer. |
| |
| A pipe must be initialized before it can be used. When initialized, the pipe |
| is empty. |
| |
| Threads interact with the pipe as follows: |
| |
| - **Writing**: Data is synchronously written, either in whole or in part, to |
| a pipe by a thread. Accepted data is either copied directly to the waiting |
| reader(s) or to the pipe's ring buffer. If the ring buffer is full or simply |
| absent, the operation blocks until sufficient space becomes available or |
| the specified timeout expires. |
| |
| - **Reading**: Data is synchronously read, either in whole or in part, from a |
| pipe by a thread. Accepted data is either copied from the pipe's ring buffer |
| or directly from the waiting sender(s). If the ring buffer is empty or simply |
| absent, the operation blocks until data becomes available or the specified |
| timeout expires. |
| |
| - **Resetting**: A thread can reset a pipe, which resets its internal state and |
| ends all pending read and write operations with an error code. |
| |
| Pipes are well-suited for scenarios like producer-consumer patterns or |
| streaming data between threads. |
| |
| Implementation |
| ************** |
| |
| A pipe is defined using a variable of type :c:struct:`k_pipe` and a |
| byte buffer. The pipe must then be initialized by calling :c:func:`k_pipe_init`. |
| |
| The following code defines and initializes an empty pipe with a ring buffer |
| capable of holding 100 bytes, aligned to a 4-byte boundary: |
| |
| .. code-block:: c |
| |
| uint8_t __aligned(4) my_ring_buffer[100]; |
| struct k_pipe my_pipe; |
| |
| k_pipe_init(&my_pipe, my_ring_buffer, sizeof(my_ring_buffer)); |
| |
| Alternatively, a pipe can be defined and initialized at compile time using |
| the :c:macro:`K_PIPE_DEFINE` macro, which defines both the pipe and its |
| ring buffer: |
| |
| .. code-block:: c |
| |
| K_PIPE_DEFINE(my_pipe, 100, 4); |
| |
| This has the same effect as the code above. |
| |
| When no ring buffer is used, the buffer pointer argument should be NULL and |
| the size argument should be 0. |
| |
| Writing to a Pipe |
| ================= |
| |
| Data is added to a pipe by calling :c:func:`k_pipe_write`. |
| |
| The following example demonstrates using a pipe to send data from a producer |
| thread to one or more consumer threads. If the pipe's ring buffer fills up, |
| the producer thread waits for a specified amount of time. |
| |
| .. code-block:: c |
| |
| struct message_header { |
| size_t num_data_bytes; /* Example field */ |
| ... |
| }; |
| |
| void producer_thread(void) |
| { |
| int rc; |
| uint8_t *data; |
| size_t total_size; |
| size_t bytes_written; |
| |
| while (1) { |
| /* Craft message to send in the pipe */ |
| make_message(data, &total_size); |
| bytes_written = 0; |
| |
| /* Write data to the pipe, handling partial writes */ |
| while (bytes_written < total_size) { |
| rc = k_pipe_write(&my_pipe, &data[bytes_written], total_size - bytes_written, K_NO_WAIT); |
| |
| if (rc < 0) { |
| /* Error occurred */ |
| ... |
| break; |
| } else { |
| /* Partial or full write succeeded; adjust for next iteration */ |
| bytes_written += rc; |
| } |
| } |
| |
| /* Reset bytes_written for the next message */ |
| bytes_written = 0; |
| ... |
| } |
| } |
| |
| Reading from a Pipe |
| =================== |
| |
| Data is retrieved from the pipe by calling :c:func:`k_pipe_read`. |
| |
| The following example builds on the producer thread example above. It shows |
| a consumer thread that processes data generated by the producer. |
| |
| .. code-block:: c |
| |
| struct message_header { |
| size_t num_data_bytes; /* Example field */ |
| ... |
| }; |
| |
| void consumer_thread(void) |
| { |
| int rc; |
| uint8_t buffer[128]; |
| size_t bytes_read = 0; |
| struct message_header *header = (struct message_header *)buffer; |
| |
| while (1) { |
| /* Step 1: Read the message header */ |
| bytes_read = 0; |
| read_header: |
| while (bytes_read < sizeof(*header)) { |
| rc = k_pipe_read(&my_pipe, &buffer[bytes_read], sizeof(*header) - bytes_read, &bytes_read, K_NO_WAIT); |
| |
| if (rc < 0) { |
| /* Error occurred */ |
| ... |
| goto read_header; |
| } |
| |
| /* Adjust for partial reads */ |
| bytes_read += rc; |
| } |
| |
| /* Step 2: Read the message body */ |
| bytes_read = 0; |
| while (bytes_read < header->num_data_bytes) { |
| rc = k_pipe_read(&my_pipe, &buffer[sizeof(*header) + bytes_read], header->num_data_bytes - bytes_read, K_NO_WAIT); |
| |
| if (rc < 0) { |
| /* Error occurred */ |
| ... |
| goto read_header; |
| } |
| |
| /* Adjust for partial reads */ |
| bytes_read += rc; |
| } |
| /* Successfully received the complete message */ |
| } |
| } |
| |
| Resetting a Pipe |
| ================ |
| |
| The pipe can be reset by calling :c:func:`k_pipe_reset`. Resetting a pipe |
| resets its internal state and ends all pending operations with an error code. |
| |
| The following example demonstrates resetting a pipe in response to a critical |
| error: |
| |
| .. code-block:: c |
| |
| void monitor_thread(void) |
| { |
| while (1) { |
| ... |
| /* Critical error detected: reset the entire pipe to reset it. */ |
| k_pipe_reset(&my_pipe); |
| ... |
| } |
| } |
| |
| Suggested Uses |
| ************** |
| |
| Pipes are useful for sending streams of data between threads. Typical |
| applications include: |
| |
| - Implementing producer-consumer patterns. |
| - Streaming logs or packets between threads. |
| - Handling variable-length message passing in real-time systems. |
| |
| API Reference |
| ************* |
| |
| .. doxygengroup:: pipe_apis |