/*
 * Copyright (c) 2022 Intel Corporation
 *
 * SPDX-License-Identifier: Apache-2.0
 */


#ifndef ZEPHYR_RTIO_SPSC_H_
#define ZEPHYR_RTIO_SPSC_H_

#include <stdint.h>
#include <stdbool.h>
#include <zephyr/sys/atomic.h>

/**
 * @brief RTIO Single Producer Single Consumer (SPSC) Queue API
 * @defgroup rtio_spsc RTIO SPSC API
 * @ingroup rtio
 * @{
 */

/**
 * @file rtio_spsc.h
 *
 * @brief A lock-free and type safe power of 2 fixed sized single producer
 * single consumer (SPSC) queue using a ringbuffer and atomics to ensure
 * coherency.
 *
 * This SPSC queue implementation works on an array which wraps using a power of
 * two size and uses a bit mask to perform a modulus. Atomics are used to allow
 * single-producer single-consumer safe semantics without locks. Elements are
 * expected to be of a fixed size. The API is type safe as the underlying buffer
 * is typed and all usage is done through macros.
 *
 * An SPSC queue may be declared on a stack or statically and work as intended so
 * long as its lifetime outlives any usage. Static declarations should be the
 * preferred method as stack . It is meant to be a shared object between two
 * execution contexts (ISR and a thread for example)
 *
 * An SPSC queue is safe to produce or consume in an ISR with O(1) push/pull.
 *
 * @warning SPSC is *not* safe to produce or consume in multiple execution
 * contexts.
 *
 * Safe usage would be, where A and B are unique execution contexts:
 * 1. ISR A producing and a Thread B consuming.
 * 2. Thread A producing and ISR B consuming.
 * 3. Thread A producing and Thread B consuming.
 * 4. ISR A producing and ISR B consuming.
 */

/**
 * @private
 * @brief Common SPSC attributes
 *
 * @warning Not to be manipulated without the macros!
 */
struct rtio_spsc {
	/* private value only the producer thread should mutate */
	unsigned long acquire;

	/* private value only the consumer thread should mutate */
	unsigned long consume;

	/* producer mutable, consumer readable */
	atomic_t in;

	/* consumer mutable, producer readable */
	atomic_t out;

	/* mask used to automatically wrap values */
	const unsigned long mask;
};

/**
 * @brief Statically initialize an rtio_spsc
 *
 * @param name Name of the spsc symbol to be provided
 * @param type Type stored in the spsc
 * @param sz Size of the spsc, must be power of 2 (ex: 2, 4, 8)
 */
#define RTIO_SPSC_INITIALIZER(name, type, sz)	  \
	{ ._spsc = {				  \
		  .acquire = 0,			  \
		  .consume = 0,			  \
		  .in = ATOMIC_INIT(0),		  \
		  .out = ATOMIC_INIT(0),	  \
		  .mask = sz - 1,		  \
	  }					  \
	}

/**
 * @brief Declare an anonymous struct type for an rtio_spsc
 *
 * @param name Name of the spsc symbol to be provided
 * @param type Type stored in the spsc
 * @param sz Size of the spsc, must be power of 2 (ex: 2, 4, 8)
 */
#define RTIO_SPSC_DECLARE(name, type, sz) \
	struct rtio_spsc_ ## name {	  \
		struct rtio_spsc _spsc;	  \
		type buffer[sz];	  \
	}

/**
 * @brief Define an rtio_spsc with a fixed size
 *
 * @param name Name of the spsc symbol to be provided
 * @param type Type stored in the spsc
 * @param sz Size of the spsc, must be power of 2 (ex: 2, 4, 8)
 */
#define RTIO_SPSC_DEFINE(name, type, sz)                                                           \
	RTIO_SPSC_DECLARE(name, type, sz) name = RTIO_SPSC_INITIALIZER(name, type, sz);

/**
 * @brief Size of the SPSC queue
 *
 * @param spsc SPSC reference
 */
#define rtio_spsc_size(spsc) ((spsc)->_spsc.mask + 1)

/**
 * @private
 * @brief A number modulo the spsc size, assumes power of 2
 *
 * @param spsc SPSC reference
 * @param i Value to modulo to the size of the spsc
 */
#define z_rtio_spsc_mask(spsc, i) ((i) & (spsc)->_spsc.mask)

/**
 * @brief Initialize/reset a spsc such that its empty
 *
 * Note that this is not safe to do while being used in a producer/consumer
 * situation with multiple calling contexts (isrs/threads).
 *
 * @param spsc SPSC to initialize/reset
 */
#define rtio_spsc_reset(spsc)                                                                      \
	({                                                                                         \
		(spsc)->_spsc.consume = 0;                                                         \
		(spsc)->_spsc.acquire = 0;                                                         \
		atomic_set(&(spsc)->_spsc.in, 0);                                                  \
		atomic_set(&(spsc)->_spsc.out, 0);                                                 \
	})

/**
 * @brief Acquire an element to produce from the SPSC
 *
 * @param spsc SPSC to acquire an element from for producing
 *
 * @return A pointer to the acquired element or null if the spsc is full
 */
#define rtio_spsc_acquire(spsc)                                                                    \
	({                                                                                         \
		unsigned long idx = atomic_get(&(spsc)->_spsc.in) + (spsc)->_spsc.acquire;         \
		bool acq = (idx - atomic_get(&(spsc)->_spsc.out)) < rtio_spsc_size(spsc);          \
		if (acq) {                                                                         \
			(spsc)->_spsc.acquire += 1;                                                \
		}                                                                                  \
		acq ? &((spsc)->buffer[z_rtio_spsc_mask(spsc, idx)]) : NULL;                       \
	})

/**
 * @brief Produce one previously acquired element to the SPSC
 *
 * This makes one element available to the consumer immediately
 *
 * @param spsc SPSC to produce the previously acquired element or do nothing
 */
#define rtio_spsc_produce(spsc)                                                                    \
	({                                                                                         \
		if ((spsc)->_spsc.acquire > 0) {                                                   \
			(spsc)->_spsc.acquire -= 1;                                                \
			atomic_add(&(spsc)->_spsc.in, 1);                                          \
		}                                                                                  \
	})

/**
 * @brief Produce all previously acquired elements to the SPSC
 *
 * This makes all previous acquired elements available to the consumer
 * immediately
 *
 * @param spsc SPSC to produce all previously acquired elements or do nothing
 */
#define rtio_spsc_produce_all(spsc)                                                                \
	({                                                                                         \
		if ((spsc)->_spsc.acquire > 0) {                                                   \
			unsigned long acquired = (spsc)->_spsc.acquire;                            \
			(spsc)->_spsc.acquire = 0;                                                 \
			atomic_add(&(spsc)->_spsc.in, acquired);                                   \
		}                                                                                  \
	})

/**
 * @brief Consume an element from the spsc
 *
 * @param spsc Spsc to consume from
 *
 * @return Pointer to element or null if no consumable elements left
 */
#define rtio_spsc_consume(spsc)                                                                    \
	({                                                                                         \
		unsigned long idx = atomic_get(&(spsc)->_spsc.out) + (spsc)->_spsc.consume;        \
		bool has_consumable = (idx != atomic_get(&(spsc)->_spsc.in));                      \
		if (has_consumable) {                                                              \
			(spsc)->_spsc.consume += 1;                                                \
		}                                                                                  \
		has_consumable ? &((spsc)->buffer[z_rtio_spsc_mask(spsc, idx)]) : NULL;            \
	})

/**
 * @brief Release a consumed element
 *
 * @param spsc SPSC to release consumed element or do nothing
 */
#define rtio_spsc_release(spsc)                                                                    \
	({                                                                                         \
		if ((spsc)->_spsc.consume > 0) {                                                   \
			(spsc)->_spsc.consume -= 1;                                                \
			atomic_add(&(spsc)->_spsc.out, 1);                                         \
		}                                                                                  \
	})

/**
 * @brief Count of consumables in spsc
 *
 * @param spsc SPSC to get item count for
 */
#define rtio_spsc_consumable(spsc)                                                                 \
	({ (spsc)->_spsc.in - (spsc)->_spsc.out - (spsc)->_spsc.consume; })

/**
 * @brief Peek at the first available item in queue
 *
 * @param spsc Spsc to peek into
 *
 * @return Pointer to element or null if no consumable elements left
 */
#define rtio_spsc_peek(spsc)                                                                       \
	({                                                                                         \
		unsigned long idx = atomic_get(&(spsc)->_spsc.out) + (spsc)->_spsc.consume;        \
		bool has_consumable = (idx != atomic_get(&(spsc)->_spsc.in));                      \
		has_consumable ? &((spsc)->buffer[z_rtio_spsc_mask(spsc, idx)]) : NULL;            \
	})

/**
 * @brief Peek at the next item in the queue from a given one
 *
 *
 * @param spsc SPSC to peek at
 * @param item Pointer to an item in the queue
 *
 * @return Pointer to element or null if none left
 */
#define rtio_spsc_next(spsc, item)                                                                 \
	({                                                                                         \
		unsigned long idx = ((item) - (spsc)->buffer);                                     \
		bool has_next = z_rtio_spsc_mask(spsc, (idx + 1)) !=                               \
				(z_rtio_spsc_mask(spsc, atomic_get(&(spsc)->_spsc.in)));           \
		has_next ? &((spsc)->buffer[z_rtio_spsc_mask((spsc), idx + 1)]) : NULL;            \
	})

/**
 * @brief Get the previous item in the queue from a given one
 *
 * @param spsc SPSC to peek at
 * @param item Pointer to an item in the queue
 *
 * @return Pointer to element or null if none left
 */
#define rtio_spsc_prev(spsc, item)                                                                 \
	({                                                                                         \
		unsigned long idx = ((item) - &(spsc)->buffer[0]) / sizeof((spsc)->buffer[0]);     \
		bool has_prev = idx != z_rtio_spsc_mask(spsc, atomic_get(&(spsc)->_spsc.out));     \
		has_prev ? &((spsc)->buffer[z_rtio_spsc_mask(spsc, idx - 1)]) : NULL;              \
	})

/**
 * @}
 */

#endif /* ZEPHYR_RTIO_SPSC_H_ */
