blob: 78b0613ae9614387895605d573b68b8a29797785 [file] [log] [blame]
/*
* Copyright (c) 2016 Wind River Systems, Inc.
*
* SPDX-License-Identifier: Apache-2.0
*/
/**
* @file
*
* @brief Pipes
*/
#include <kernel.h>
#include <kernel_structs.h>
#include <debug/object_tracing_common.h>
#include <toolchain.h>
#include <linker/sections.h>
#include <wait_q.h>
#include <misc/dlist.h>
#include <init.h>
#include <syscall_handler.h>
struct k_pipe_desc {
unsigned char *buffer; /* Position in src/dest buffer */
size_t bytes_to_xfer; /* # bytes left to transfer */
#if (CONFIG_NUM_PIPE_ASYNC_MSGS > 0)
struct k_mem_block *block; /* Pointer to memory block */
struct k_mem_block copy_block; /* For backwards compatibility */
struct k_sem *sem; /* Semaphore to give if async */
#endif
};
struct k_pipe_async {
struct _thread_base thread; /* Dummy thread object */
struct k_pipe_desc desc; /* Pipe message descriptor */
};
extern struct k_pipe _k_pipe_list_start[];
extern struct k_pipe _k_pipe_list_end[];
#ifdef CONFIG_OBJECT_TRACING
struct k_pipe *_trace_list_k_pipe;
#endif /* CONFIG_OBJECT_TRACING */
#if (CONFIG_NUM_PIPE_ASYNC_MSGS > 0)
/* Array of asynchronous message descriptors */
static struct k_pipe_async __noinit async_msg[CONFIG_NUM_PIPE_ASYNC_MSGS];
/* stack of unused asynchronous message descriptors */
K_STACK_DEFINE(pipe_async_msgs, CONFIG_NUM_PIPE_ASYNC_MSGS);
/* Allocate an asynchronous message descriptor */
static void _pipe_async_alloc(struct k_pipe_async **async)
{
k_stack_pop(&pipe_async_msgs, (u32_t *)async, K_FOREVER);
}
/* Free an asynchronous message descriptor */
static void _pipe_async_free(struct k_pipe_async *async)
{
k_stack_push(&pipe_async_msgs, (u32_t)async);
}
/* Finish an asynchronous operation */
static void _pipe_async_finish(struct k_pipe_async *async_desc)
{
/*
* An asynchronous operation is finished with the scheduler locked
* to prevent the called routines from scheduling a new thread.
*/
k_mem_pool_free(async_desc->desc.block);
if (async_desc->desc.sem != NULL) {
k_sem_give(async_desc->desc.sem);
}
_pipe_async_free(async_desc);
}
#endif /* CONFIG_NUM_PIPE_ASYNC_MSGS > 0 */
#if (CONFIG_NUM_PIPE_ASYNC_MSGS > 0) || \
defined(CONFIG_OBJECT_TRACING)
/*
* Do run-time initialization of pipe object subsystem.
*/
static int init_pipes_module(struct device *dev)
{
ARG_UNUSED(dev);
#if (CONFIG_NUM_PIPE_ASYNC_MSGS > 0)
/*
* Create pool of asynchronous pipe message descriptors.
*
* A dummy thread requires minimal initialization, since it never gets
* to execute. The _THREAD_DUMMY flag is sufficient to distinguish a
* dummy thread from a real one. The threads are *not* added to the
* kernel's list of known threads.
*
* Once initialized, the address of each descriptor is added to a stack
* that governs access to them.
*/
for (int i = 0; i < CONFIG_NUM_PIPE_ASYNC_MSGS; i++) {
async_msg[i].thread.thread_state = _THREAD_DUMMY;
async_msg[i].thread.swap_data = &async_msg[i].desc;
k_stack_push(&pipe_async_msgs, (u32_t)&async_msg[i]);
}
#endif /* CONFIG_NUM_PIPE_ASYNC_MSGS > 0 */
/* Complete initialization of statically defined mailboxes. */
#ifdef CONFIG_OBJECT_TRACING
struct k_pipe *pipe;
for (pipe = _k_pipe_list_start; pipe < _k_pipe_list_end; pipe++) {
SYS_TRACING_OBJ_INIT(k_pipe, pipe);
}
#endif /* CONFIG_OBJECT_TRACING */
return 0;
}
SYS_INIT(init_pipes_module, PRE_KERNEL_1, CONFIG_KERNEL_INIT_PRIORITY_OBJECTS);
#endif /* CONFIG_NUM_PIPE_ASYNC_MSGS or CONFIG_OBJECT_TRACING */
void _impl_k_pipe_init(struct k_pipe *pipe, unsigned char *buffer, size_t size)
{
pipe->buffer = buffer;
pipe->size = size;
pipe->bytes_used = 0;
pipe->read_index = 0;
pipe->write_index = 0;
sys_dlist_init(&pipe->wait_q.writers);
sys_dlist_init(&pipe->wait_q.readers);
SYS_TRACING_OBJ_INIT(k_pipe, pipe);
_k_object_init(pipe);
}
#ifdef CONFIG_USERSPACE
_SYSCALL_HANDLER3(k_pipe_init, pipe, buffer, size)
{
_SYSCALL_OBJ_INIT(pipe, K_OBJ_PIPE);
_SYSCALL_MEMORY_WRITE(buffer, size);
_impl_k_pipe_init((struct k_pipe *)pipe, (unsigned char *)buffer,
size);
return 0;
}
#endif
/**
* @brief Copy bytes from @a src to @a dest
*
* @return Number of bytes copied
*/
static size_t _pipe_xfer(unsigned char *dest, size_t dest_size,
const unsigned char *src, size_t src_size)
{
size_t num_bytes = min(dest_size, src_size);
const unsigned char *end = src + num_bytes;
while (src != end) {
*dest = *src;
dest++;
src++;
}
return num_bytes;
}
/**
* @brief Put data from @a src into the pipe's circular buffer
*
* Modifies the following fields in @a pipe:
* buffer, bytes_used, write_index
*
* @return Number of bytes written to the pipe's circular buffer
*/
static size_t _pipe_buffer_put(struct k_pipe *pipe,
const unsigned char *src, size_t src_size)
{
size_t bytes_copied;
size_t run_length;
size_t num_bytes_written = 0;
int i;
for (i = 0; i < 2; i++) {
run_length = min(pipe->size - pipe->bytes_used,
pipe->size - pipe->write_index);
bytes_copied = _pipe_xfer(pipe->buffer + pipe->write_index,
run_length,
src + num_bytes_written,
src_size - num_bytes_written);
num_bytes_written += bytes_copied;
pipe->bytes_used += bytes_copied;
pipe->write_index += bytes_copied;
if (pipe->write_index == pipe->size) {
pipe->write_index = 0;
}
}
return num_bytes_written;
}
/**
* @brief Get data from the pipe's circular buffer
*
* Modifies the following fields in @a pipe:
* bytes_used, read_index
*
* @return Number of bytes read from the pipe's circular buffer
*/
static size_t _pipe_buffer_get(struct k_pipe *pipe,
unsigned char *dest, size_t dest_size)
{
size_t bytes_copied;
size_t run_length;
size_t num_bytes_read = 0;
int i;
for (i = 0; i < 2; i++) {
run_length = min(pipe->bytes_used,
pipe->size - pipe->read_index);
bytes_copied = _pipe_xfer(dest + num_bytes_read,
dest_size - num_bytes_read,
pipe->buffer + pipe->read_index,
run_length);
num_bytes_read += bytes_copied;
pipe->bytes_used -= bytes_copied;
pipe->read_index += bytes_copied;
if (pipe->read_index == pipe->size) {
pipe->read_index = 0;
}
}
return num_bytes_read;
}
/**
* @brief Prepare a working set of readers/writers
*
* Prepare a list of "working threads" into/from which the data
* will be directly copied. This list is useful as it is used to ...
*
* 1. avoid double copying
* 2. minimize interrupt latency as interrupts are unlocked
* while copying data
* 3. ensure a timeout can not make the request impossible to satisfy
*
* The list is populated with previously pended threads that will be ready to
* run after the pipe call is complete.
*
* Important things to remember when reading from the pipe ...
* 1. If there are writers int @a wait_q, then the pipe's buffer is full.
* 2. Conversely if the pipe's buffer is not full, there are no writers.
* 3. The amount of available data in the pipe is the sum the bytes used in
* the pipe (@a pipe_space) and all the requests from the waiting writers.
* 4. Since data is read from the pipe's buffer first, the working set must
* include writers that will (try to) re-fill the pipe's buffer afterwards.
*
* Important things to remember when writing to the pipe ...
* 1. If there are readers in @a wait_q, then the pipe's buffer is empty.
* 2. Conversely if the pipe's buffer is not empty, then there are no readers.
* 3. The amount of space available in the pipe is the sum of the bytes unused
* in the pipe (@a pipe_space) and all the requests from the waiting readers.
*
* @return false if request is unsatisfiable, otherwise true
*/
static bool _pipe_xfer_prepare(sys_dlist_t *xfer_list,
struct k_thread **waiter,
_wait_q_t *wait_q,
size_t pipe_space,
size_t bytes_to_xfer,
size_t min_xfer,
s32_t timeout)
{
sys_dnode_t *node;
struct k_thread *thread;
struct k_pipe_desc *desc;
size_t num_bytes = 0;
if (timeout == K_NO_WAIT) {
for (node = sys_dlist_peek_head(wait_q); node != NULL;
node = sys_dlist_peek_next(wait_q, node)) {
thread = (struct k_thread *)node;
desc = (struct k_pipe_desc *)thread->base.swap_data;
num_bytes += desc->bytes_to_xfer;
if (num_bytes >= bytes_to_xfer) {
break;
}
}
if (num_bytes + pipe_space < min_xfer) {
return false;
}
}
/*
* Either @a timeout is not K_NO_WAIT (so the thread may pend) or
* the entire request can be satisfied. Generate the working list.
*/
sys_dlist_init(xfer_list);
num_bytes = 0;
while ((thread = (struct k_thread *) sys_dlist_peek_head(wait_q))) {
desc = (struct k_pipe_desc *)thread->base.swap_data;
num_bytes += desc->bytes_to_xfer;
if (num_bytes > bytes_to_xfer) {
/*
* This request can not be fully satisfied.
* Do not remove it from the wait_q.
* Do not abort its timeout (if applicable).
* Do not add it to the transfer list
*/
break;
}
/*
* This request can be fully satisfied.
* Remove it from the wait_q.
* Abort its timeout.
* Add it to the transfer list.
*/
_unpend_thread(thread);
_abort_thread_timeout(thread);
sys_dlist_append(xfer_list, &thread->base.k_q_node);
}
*waiter = (num_bytes > bytes_to_xfer) ? thread : NULL;
return true;
}
/**
* @brief Determine the correct return code
*
* Bytes Xferred No Wait Wait
* >= Minimum 0 0
* < Minimum -EIO* -EAGAIN
*
* * The "-EIO No Wait" case was already checked when the "working set"
* was created in _pipe_xfer_prepare().
*
* @return See table above
*/
static int _pipe_return_code(size_t min_xfer, size_t bytes_remaining,
size_t bytes_requested)
{
if (bytes_requested - bytes_remaining >= min_xfer) {
/*
* At least the minimum number of requested
* bytes have been transferred.
*/
return 0;
}
return -EAGAIN;
}
/**
* @brief Ready a pipe thread
*
* If the pipe thread is a real thread, then add it to the ready queue.
* If it is a dummy thread, then finish the asynchronous work.
*
* @return N/A
*/
static void _pipe_thread_ready(struct k_thread *thread)
{
unsigned int key;
#if (CONFIG_NUM_PIPE_ASYNC_MSGS > 0)
if (thread->base.thread_state & _THREAD_DUMMY) {
_pipe_async_finish((struct k_pipe_async *)thread);
return;
}
#endif
key = irq_lock();
_ready_thread(thread);
irq_unlock(key);
}
/**
* @brief Internal API used to send data to a pipe
*/
int _k_pipe_put_internal(struct k_pipe *pipe, struct k_pipe_async *async_desc,
unsigned char *data, size_t bytes_to_write,
size_t *bytes_written, size_t min_xfer,
s32_t timeout)
{
struct k_thread *reader;
struct k_pipe_desc *desc;
sys_dlist_t xfer_list;
unsigned int key;
size_t num_bytes_written = 0;
size_t bytes_copied;
#if (CONFIG_NUM_PIPE_ASYNC_MSGS == 0)
ARG_UNUSED(async_desc);
#endif
key = irq_lock();
/*
* Create a list of "working readers" into which the data will be
* directly copied.
*/
if (!_pipe_xfer_prepare(&xfer_list, &reader, &pipe->wait_q.readers,
pipe->size - pipe->bytes_used, bytes_to_write,
min_xfer, timeout)) {
irq_unlock(key);
*bytes_written = 0;
return -EIO;
}
_sched_lock();
irq_unlock(key);
/*
* 1. 'xfer_list' currently contains a list of reader threads that can
* have their read requests fulfilled by the current call.
* 2. 'reader' if not NULL points to a thread on the reader wait_q
* that can get some of its requested data.
* 3. Interrupts are unlocked but the scheduler is locked to allow
* ticks to be delivered but no scheduling to occur
* 4. If 'reader' times out while we are copying data, not only do we
* still have a pointer to it, but it can not execute until this call
* is complete so it is still safe to copy data to it.
*/
struct k_thread *thread = (struct k_thread *)
sys_dlist_get(&xfer_list);
while (thread) {
desc = (struct k_pipe_desc *)thread->base.swap_data;
bytes_copied = _pipe_xfer(desc->buffer, desc->bytes_to_xfer,
data + num_bytes_written,
bytes_to_write - num_bytes_written);
num_bytes_written += bytes_copied;
desc->buffer += bytes_copied;
desc->bytes_to_xfer -= bytes_copied;
/* The thread's read request has been satisfied. Ready it. */
key = irq_lock();
_ready_thread(thread);
irq_unlock(key);
thread = (struct k_thread *)sys_dlist_get(&xfer_list);
}
/*
* Copy any data to the reader that we left on the wait_q.
* It is possible no data will be copied.
*/
if (reader) {
desc = (struct k_pipe_desc *)reader->base.swap_data;
bytes_copied = _pipe_xfer(desc->buffer, desc->bytes_to_xfer,
data + num_bytes_written,
bytes_to_write - num_bytes_written);
num_bytes_written += bytes_copied;
desc->buffer += bytes_copied;
desc->bytes_to_xfer -= bytes_copied;
}
/*
* As much data as possible has been directly copied to any waiting
* readers. Add as much as possible to the pipe's circular buffer.
*/
num_bytes_written +=
_pipe_buffer_put(pipe, data + num_bytes_written,
bytes_to_write - num_bytes_written);
if (num_bytes_written == bytes_to_write) {
*bytes_written = num_bytes_written;
#if (CONFIG_NUM_PIPE_ASYNC_MSGS > 0)
if (async_desc != NULL) {
_pipe_async_finish(async_desc);
}
#endif
k_sched_unlock();
return 0;
}
/* Not all data was copied. */
#if (CONFIG_NUM_PIPE_ASYNC_MSGS > 0)
if (async_desc != NULL) {
/*
* Lock interrupts and unlock the scheduler before
* manipulating the writers wait_q.
*/
key = irq_lock();
_sched_unlock_no_reschedule();
_pend_thread((struct k_thread *) &async_desc->thread,
&pipe->wait_q.writers, K_FOREVER);
_reschedule_threads(key);
return 0;
}
#endif
struct k_pipe_desc pipe_desc;
pipe_desc.buffer = data + num_bytes_written;
pipe_desc.bytes_to_xfer = bytes_to_write - num_bytes_written;
if (timeout != K_NO_WAIT) {
_current->base.swap_data = &pipe_desc;
/*
* Lock interrupts and unlock the scheduler before
* manipulating the writers wait_q.
*/
key = irq_lock();
_sched_unlock_no_reschedule();
_pend_current_thread(&pipe->wait_q.writers, timeout);
_Swap(key);
} else {
k_sched_unlock();
}
*bytes_written = bytes_to_write - pipe_desc.bytes_to_xfer;
return _pipe_return_code(min_xfer, pipe_desc.bytes_to_xfer,
bytes_to_write);
}
int _impl_k_pipe_get(struct k_pipe *pipe, void *data, size_t bytes_to_read,
size_t *bytes_read, size_t min_xfer, s32_t timeout)
{
struct k_thread *writer;
struct k_pipe_desc *desc;
sys_dlist_t xfer_list;
unsigned int key;
size_t num_bytes_read = 0;
size_t bytes_copied;
__ASSERT(min_xfer <= bytes_to_read, "");
__ASSERT(bytes_read != NULL, "");
key = irq_lock();
/*
* Create a list of "working readers" into which the data will be
* directly copied.
*/
if (!_pipe_xfer_prepare(&xfer_list, &writer, &pipe->wait_q.writers,
pipe->bytes_used, bytes_to_read,
min_xfer, timeout)) {
irq_unlock(key);
*bytes_read = 0;
return -EIO;
}
_sched_lock();
irq_unlock(key);
num_bytes_read = _pipe_buffer_get(pipe, data, bytes_to_read);
/*
* 1. 'xfer_list' currently contains a list of writer threads that can
* have their write requests fulfilled by the current call.
* 2. 'writer' if not NULL points to a thread on the writer wait_q
* that can post some of its requested data.
* 3. Data will be copied from each writer's buffer to either the
* reader's buffer and/or to the pipe's circular buffer.
* 4. Interrupts are unlocked but the scheduler is locked to allow
* ticks to be delivered but no scheduling to occur
* 5. If 'writer' times out while we are copying data, not only do we
* still have a pointer to it, but it can not execute until this
* call is complete so it is still safe to copy data from it.
*/
struct k_thread *thread = (struct k_thread *)
sys_dlist_get(&xfer_list);
while (thread && (num_bytes_read < bytes_to_read)) {
desc = (struct k_pipe_desc *)thread->base.swap_data;
bytes_copied = _pipe_xfer(data + num_bytes_read,
bytes_to_read - num_bytes_read,
desc->buffer, desc->bytes_to_xfer);
num_bytes_read += bytes_copied;
desc->buffer += bytes_copied;
desc->bytes_to_xfer -= bytes_copied;
/*
* It is expected that the write request will be satisfied.
* However, if the read request was satisfied before the
* write request was satisfied, then the write request must
* finish later when writing to the pipe's circular buffer.
*/
if (num_bytes_read == bytes_to_read) {
break;
}
_pipe_thread_ready(thread);
thread = (struct k_thread *)sys_dlist_get(&xfer_list);
}
if (writer && (num_bytes_read < bytes_to_read)) {
desc = (struct k_pipe_desc *)writer->base.swap_data;
bytes_copied = _pipe_xfer(data + num_bytes_read,
bytes_to_read - num_bytes_read,
desc->buffer, desc->bytes_to_xfer);
num_bytes_read += bytes_copied;
desc->buffer += bytes_copied;
desc->bytes_to_xfer -= bytes_copied;
}
/*
* Copy as much data as possible from the writers (if any)
* into the pipe's circular buffer.
*/
while (thread) {
desc = (struct k_pipe_desc *)thread->base.swap_data;
bytes_copied = _pipe_buffer_put(pipe, desc->buffer,
desc->bytes_to_xfer);
desc->buffer += bytes_copied;
desc->bytes_to_xfer -= bytes_copied;
/* Write request has been satsified */
_pipe_thread_ready(thread);
thread = (struct k_thread *)sys_dlist_get(&xfer_list);
}
if (writer) {
desc = (struct k_pipe_desc *)writer->base.swap_data;
bytes_copied = _pipe_buffer_put(pipe, desc->buffer,
desc->bytes_to_xfer);
desc->buffer += bytes_copied;
desc->bytes_to_xfer -= bytes_copied;
}
if (num_bytes_read == bytes_to_read) {
k_sched_unlock();
*bytes_read = num_bytes_read;
return 0;
}
/* Not all data was read. */
struct k_pipe_desc pipe_desc;
pipe_desc.buffer = data + num_bytes_read;
pipe_desc.bytes_to_xfer = bytes_to_read - num_bytes_read;
if (timeout != K_NO_WAIT) {
_current->base.swap_data = &pipe_desc;
key = irq_lock();
_sched_unlock_no_reschedule();
_pend_current_thread(&pipe->wait_q.readers, timeout);
_Swap(key);
} else {
k_sched_unlock();
}
*bytes_read = bytes_to_read - pipe_desc.bytes_to_xfer;
return _pipe_return_code(min_xfer, pipe_desc.bytes_to_xfer,
bytes_to_read);
}
#ifdef CONFIG_USERSPACE
_SYSCALL_HANDLER6(k_pipe_get,
pipe, data, bytes_to_read, bytes_read_p, min_xfer_p, timeout)
{
size_t *bytes_read = (size_t *)bytes_read_p;
size_t min_xfer = (size_t)min_xfer_p;
_SYSCALL_OBJ(pipe, K_OBJ_PIPE);
_SYSCALL_MEMORY_WRITE(bytes_read, sizeof(*bytes_read));
_SYSCALL_MEMORY_WRITE((void *)data, bytes_to_read);
_SYSCALL_VERIFY(min_xfer <= bytes_to_read);
return _impl_k_pipe_get((struct k_pipe *)pipe, (void *)data,
bytes_to_read, bytes_read, min_xfer,
timeout);
}
#endif
int _impl_k_pipe_put(struct k_pipe *pipe, void *data, size_t bytes_to_write,
size_t *bytes_written, size_t min_xfer, s32_t timeout)
{
__ASSERT(min_xfer <= bytes_to_write, "");
__ASSERT(bytes_written != NULL, "");
return _k_pipe_put_internal(pipe, NULL, data,
bytes_to_write, bytes_written,
min_xfer, timeout);
}
#ifdef CONFIG_USERSPACE
_SYSCALL_HANDLER6(k_pipe_put, pipe, data, bytes_to_write, bytes_written_p,
min_xfer_p, timeout)
{
size_t *bytes_written = (size_t *)bytes_written_p;
size_t min_xfer = (size_t)min_xfer_p;
_SYSCALL_OBJ(pipe, K_OBJ_PIPE);
_SYSCALL_MEMORY_WRITE(bytes_written, sizeof(*bytes_written));
_SYSCALL_MEMORY_READ((void *)data, bytes_to_write);
_SYSCALL_VERIFY(min_xfer <= bytes_to_write);
return _impl_k_pipe_put((struct k_pipe *)pipe, (void *)data,
bytes_to_write, bytes_written, min_xfer,
timeout);
}
#endif
#if (CONFIG_NUM_PIPE_ASYNC_MSGS > 0)
void k_pipe_block_put(struct k_pipe *pipe, struct k_mem_block *block,
size_t bytes_to_write, struct k_sem *sem)
{
struct k_pipe_async *async_desc;
size_t dummy_bytes_written;
/* For simplicity, always allocate an asynchronous descriptor */
_pipe_async_alloc(&async_desc);
async_desc->desc.block = &async_desc->desc.copy_block;
async_desc->desc.copy_block = *block;
async_desc->desc.sem = sem;
async_desc->thread.prio = k_thread_priority_get(_current);
(void) _k_pipe_put_internal(pipe, async_desc, block->data,
bytes_to_write, &dummy_bytes_written,
bytes_to_write, K_FOREVER);
}
#endif