| /* |
| * Copyright (c) 2022 Intel Corporation. |
| * |
| * SPDX-License-Identifier: Apache-2.0 |
| */ |
| |
| #include <zephyr/spinlock.h> |
| #include <zephyr/rtio/rtio_executor_concurrent.h> |
| #include <zephyr/rtio/rtio.h> |
| #include <zephyr/kernel.h> |
| |
| #include <zephyr/logging/log.h> |
| LOG_MODULE_REGISTER(rtio_executor_concurrent, CONFIG_RTIO_LOG_LEVEL); |
| |
| #define CONEX_TASK_COMPLETE BIT(0) |
| #define CONEX_TASK_SUSPENDED BIT(1) |
| |
| |
| /** |
| * @file |
| * @brief Concurrent RTIO Executor |
| * |
| * The concurrent executor provides fixed amounts of concurrency |
| * using minimal overhead but assumes a small number of concurrent tasks. |
| * |
| * Many of the task lookup and management functions in here are O(N) over N |
| * tasks. This is fine when the task set is *small*. Task lookup could be |
| * improved in the future with a binary search at the expense of code size. |
| * |
| * The assumption here is that perhaps only 8-16 concurrent tasks are likely |
| * such that simple short for loops over task array are reasonably fast. |
| * |
| * A maximum of 65K submissions queue entries are possible. |
| */ |
| |
| /** |
| * check if there is a free task available |
| */ |
| static bool conex_task_free(struct rtio_concurrent_executor *exc) |
| { |
| return (exc->task_in - exc->task_out) < (exc->task_mask + 1); |
| } |
| |
| /** |
| * get the next free available task index |
| */ |
| static uint16_t conex_task_next(struct rtio_concurrent_executor *exc) |
| { |
| uint16_t task_id = exc->task_in; |
| |
| exc->task_in++; |
| return task_id; |
| } |
| |
| static uint16_t conex_task_id(struct rtio_concurrent_executor *exc, |
| const struct rtio_sqe *sqe) |
| { |
| uint16_t task_id = exc->task_out; |
| |
| for (; task_id < exc->task_in; task_id++) { |
| if (exc->task_cur[task_id & exc->task_mask] == sqe) { |
| break; |
| } |
| } |
| return task_id; |
| } |
| |
| static void conex_sweep_task(struct rtio *r, struct rtio_concurrent_executor *exc) |
| { |
| struct rtio_sqe *sqe = rtio_spsc_consume(r->sq); |
| |
| while (sqe != NULL && sqe->flags & RTIO_SQE_CHAINED) { |
| rtio_spsc_release(r->sq); |
| sqe = rtio_spsc_consume(r->sq); |
| } |
| |
| rtio_spsc_release(r->sq); |
| } |
| |
| static void conex_sweep(struct rtio *r, struct rtio_concurrent_executor *exc) |
| { |
| /* In order sweep up */ |
| for (uint16_t task_id = exc->task_out; task_id < exc->task_in; task_id++) { |
| if (exc->task_status[task_id & exc->task_mask] & CONEX_TASK_COMPLETE) { |
| LOG_INF("sweeping oldest task %d", task_id); |
| conex_sweep_task(r, exc); |
| exc->task_out++; |
| } else { |
| break; |
| } |
| } |
| } |
| |
| static void conex_resume(struct rtio *r, struct rtio_concurrent_executor *exc) |
| { |
| /* In order resume tasks */ |
| for (uint16_t task_id = exc->task_out; task_id < exc->task_in; task_id++) { |
| if (exc->task_status[task_id & exc->task_mask] & CONEX_TASK_SUSPENDED) { |
| LOG_INF("resuming suspended task %d", task_id); |
| exc->task_status[task_id] &= ~CONEX_TASK_SUSPENDED; |
| rtio_iodev_submit(exc->task_cur[task_id], r); |
| } |
| } |
| } |
| |
| static void conex_sweep_resume(struct rtio *r, struct rtio_concurrent_executor *exc) |
| { |
| conex_sweep(r, exc); |
| conex_resume(r, exc); |
| } |
| |
| /** |
| * @brief Submit submissions to concurrent executor |
| * |
| * @param r RTIO context |
| * |
| * @retval 0 Always succeeds |
| */ |
| int rtio_concurrent_submit(struct rtio *r) |
| { |
| |
| LOG_INF("submit"); |
| |
| struct rtio_concurrent_executor *exc = |
| (struct rtio_concurrent_executor *)r->executor; |
| struct rtio_sqe *sqe; |
| struct rtio_sqe *last_sqe; |
| k_spinlock_key_t key; |
| |
| key = k_spin_lock(&exc->lock); |
| |
| /* If never submitted before peek at the first item |
| * otherwise start back up where the last submit call |
| * left off |
| */ |
| if (exc->last_sqe == NULL) { |
| sqe = rtio_spsc_peek(r->sq); |
| } else { |
| /* Pickup from last submit call */ |
| sqe = rtio_spsc_next(r->sq, exc->last_sqe); |
| } |
| |
| last_sqe = sqe; |
| while (sqe != NULL && conex_task_free(exc)) { |
| LOG_INF("head SQE in chain %p", sqe); |
| |
| /* Get the next task id if one exists */ |
| uint16_t task_idx = conex_task_next(exc); |
| |
| LOG_INF("setting up task %d", task_idx); |
| |
| /* Setup task (yes this is it) */ |
| exc->task_cur[task_idx] = sqe; |
| exc->task_status[task_idx] = CONEX_TASK_SUSPENDED; |
| |
| LOG_INF("submitted sqe %p", sqe); |
| /* Go to the next sqe not in the current chain */ |
| while (sqe != NULL && (sqe->flags & RTIO_SQE_CHAINED)) { |
| sqe = rtio_spsc_next(r->sq, sqe); |
| } |
| |
| LOG_INF("tail SQE in chain %p", sqe); |
| |
| last_sqe = sqe; |
| |
| /* SQE is the end of the previous chain */ |
| sqe = rtio_spsc_next(r->sq, sqe); |
| } |
| |
| /* Out of available pointers, wait til others complete, note the |
| * first pending submission queue. May be NULL if nothing is pending. |
| */ |
| exc->pending_sqe = sqe; |
| |
| /** |
| * Run through the queue until the last item |
| * and take not of it |
| */ |
| while (sqe != NULL) { |
| last_sqe = sqe; |
| sqe = rtio_spsc_next(r->sq, sqe); |
| } |
| |
| /* Note the last sqe for the next submit call */ |
| exc->last_sqe = last_sqe; |
| |
| /* Resume all suspended tasks */ |
| conex_resume(r, exc); |
| |
| k_spin_unlock(&exc->lock, key); |
| |
| return 0; |
| } |
| |
| /** |
| * @brief Callback from an iodev describing success |
| */ |
| void rtio_concurrent_ok(struct rtio *r, const struct rtio_sqe *sqe, int result) |
| { |
| struct rtio_sqe *next_sqe; |
| k_spinlock_key_t key; |
| struct rtio_concurrent_executor *exc = (struct rtio_concurrent_executor *)r->executor; |
| |
| /* Interrupt may occur in spsc_acquire, breaking the contract |
| * so spin around it effectively preventing another interrupt on |
| * this core, and another core trying to concurrently work in here. |
| * |
| * This can and should be broken up into a few sections with a try |
| * lock around the sweep and resume. |
| */ |
| key = k_spin_lock(&exc->lock); |
| |
| rtio_cqe_submit(r, result, sqe->userdata); |
| |
| /* Determine the task id : O(n) */ |
| uint16_t task_id = conex_task_id(exc, sqe); |
| |
| if (sqe->flags & RTIO_SQE_CHAINED) { |
| next_sqe = rtio_spsc_next(r->sq, sqe); |
| |
| rtio_iodev_submit(next_sqe, r); |
| |
| exc->task_cur[task_id] = next_sqe; |
| } else { |
| exc->task_status[task_id] |= CONEX_TASK_COMPLETE; |
| } |
| |
| |
| /* Sweep up unused SQEs and tasks, retry suspended tasks */ |
| /* TODO Use a try lock here and don't bother doing it if we are already |
| * doing it elsewhere |
| */ |
| conex_sweep_resume(r, exc); |
| |
| k_spin_unlock(&exc->lock, key); |
| } |
| |
| /** |
| * @brief Callback from an iodev describing error |
| */ |
| void rtio_concurrent_err(struct rtio *r, const struct rtio_sqe *sqe, int result) |
| { |
| struct rtio_sqe *nsqe; |
| k_spinlock_key_t key; |
| struct rtio_concurrent_executor *exc = (struct rtio_concurrent_executor *)r->executor; |
| |
| /* Another interrupt (and sqe complete) may occur in spsc_acquire, |
| * breaking the contract so spin around it effectively preventing another |
| * interrupt on this core, and another core trying to concurrently work |
| * in here. |
| * |
| * This can and should be broken up into a few sections with a try |
| * lock around the sweep and resume. |
| */ |
| key = k_spin_lock(&exc->lock); |
| |
| rtio_cqe_submit(r, result, sqe->userdata); |
| |
| /* Determine the task id : O(n) */ |
| uint16_t task_id = conex_task_id(exc, sqe); |
| |
| |
| /* Fail the remaining sqe's in the chain */ |
| if (sqe->flags & RTIO_SQE_CHAINED) { |
| nsqe = rtio_spsc_next(r->sq, sqe); |
| while (nsqe != NULL && nsqe->flags & RTIO_SQE_CHAINED) { |
| rtio_cqe_submit(r, -ECANCELED, nsqe->userdata); |
| nsqe = rtio_spsc_next(r->sq, nsqe); |
| } |
| } |
| |
| /* Task is complete (failed) */ |
| exc->task_status[task_id] |= CONEX_TASK_COMPLETE; |
| |
| conex_sweep_resume(r, exc); |
| |
| k_spin_unlock(&exc->lock, key); |
| } |