use futures::channel::oneshot;
use futures::executor::LocalPool;
use futures::future::{self, lazy, poll_fn, Future};
use futures::task::{Context, LocalSpawn, LocalSpawnExt, Poll, Spawn, SpawnExt, Waker};
use std::cell::{Cell, RefCell};
use std::pin::Pin;
use std::rc::Rc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::thread;
use std::time::Duration;

struct Pending(Rc<()>);

impl Future for Pending {
    type Output = ();

    fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<()> {
        Poll::Pending
    }
}

fn pending() -> Pending {
    Pending(Rc::new(()))
}

#[test]
fn run_until_single_future() {
    let mut cnt = 0;

    {
        let mut pool = LocalPool::new();
        let fut = lazy(|_| {
            cnt += 1;
        });
        pool.run_until(fut);
    }

    assert_eq!(cnt, 1);
}

#[test]
fn run_until_ignores_spawned() {
    let mut pool = LocalPool::new();
    let spawn = pool.spawner();
    spawn.spawn_local_obj(Box::pin(pending()).into()).unwrap();
    pool.run_until(lazy(|_| ()));
}

#[test]
fn run_until_executes_spawned() {
    let (tx, rx) = oneshot::channel();
    let mut pool = LocalPool::new();
    let spawn = pool.spawner();
    spawn
        .spawn_local_obj(
            Box::pin(lazy(move |_| {
                tx.send(()).unwrap();
            }))
            .into(),
        )
        .unwrap();
    pool.run_until(rx).unwrap();
}

#[test]
fn run_returns_if_empty() {
    let mut pool = LocalPool::new();
    pool.run();
    pool.run();
}

#[test]
fn run_executes_spawned() {
    let cnt = Rc::new(Cell::new(0));
    let cnt2 = cnt.clone();

    let mut pool = LocalPool::new();
    let spawn = pool.spawner();
    let spawn2 = pool.spawner();

    spawn
        .spawn_local_obj(
            Box::pin(lazy(move |_| {
                spawn2
                    .spawn_local_obj(
                        Box::pin(lazy(move |_| {
                            cnt2.set(cnt2.get() + 1);
                        }))
                        .into(),
                    )
                    .unwrap();
            }))
            .into(),
        )
        .unwrap();

    pool.run();

    assert_eq!(cnt.get(), 1);
}

#[test]
fn run_spawn_many() {
    const ITER: usize = 200;

    let cnt = Rc::new(Cell::new(0));

    let mut pool = LocalPool::new();
    let spawn = pool.spawner();

    for _ in 0..ITER {
        let cnt = cnt.clone();
        spawn
            .spawn_local_obj(
                Box::pin(lazy(move |_| {
                    cnt.set(cnt.get() + 1);
                }))
                .into(),
            )
            .unwrap();
    }

    pool.run();

    assert_eq!(cnt.get(), ITER);
}

#[test]
fn try_run_one_returns_if_empty() {
    let mut pool = LocalPool::new();
    assert!(!pool.try_run_one());
}

#[test]
fn try_run_one_executes_one_ready() {
    const ITER: usize = 200;

    let cnt = Rc::new(Cell::new(0));

    let mut pool = LocalPool::new();
    let spawn = pool.spawner();

    for _ in 0..ITER {
        spawn.spawn_local_obj(Box::pin(pending()).into()).unwrap();

        let cnt = cnt.clone();
        spawn
            .spawn_local_obj(
                Box::pin(lazy(move |_| {
                    cnt.set(cnt.get() + 1);
                }))
                .into(),
            )
            .unwrap();

        spawn.spawn_local_obj(Box::pin(pending()).into()).unwrap();
    }

    for i in 0..ITER {
        assert_eq!(cnt.get(), i);
        assert!(pool.try_run_one());
        assert_eq!(cnt.get(), i + 1);
    }
    assert!(!pool.try_run_one());
}

#[test]
fn try_run_one_returns_on_no_progress() {
    const ITER: usize = 10;

    let cnt = Rc::new(Cell::new(0));

    let mut pool = LocalPool::new();
    let spawn = pool.spawner();

    let waker: Rc<Cell<Option<Waker>>> = Rc::new(Cell::new(None));
    {
        let cnt = cnt.clone();
        let waker = waker.clone();
        spawn
            .spawn_local_obj(
                Box::pin(poll_fn(move |ctx| {
                    cnt.set(cnt.get() + 1);
                    waker.set(Some(ctx.waker().clone()));
                    if cnt.get() == ITER {
                        Poll::Ready(())
                    } else {
                        Poll::Pending
                    }
                }))
                .into(),
            )
            .unwrap();
    }

    for i in 0..ITER - 1 {
        assert_eq!(cnt.get(), i);
        assert!(!pool.try_run_one());
        assert_eq!(cnt.get(), i + 1);
        let w = waker.take();
        assert!(w.is_some());
        w.unwrap().wake();
    }
    assert!(pool.try_run_one());
    assert_eq!(cnt.get(), ITER);
}

#[test]
fn try_run_one_runs_sub_futures() {
    let mut pool = LocalPool::new();
    let spawn = pool.spawner();
    let cnt = Rc::new(Cell::new(0));

    let inner_spawner = spawn.clone();
    let cnt1 = cnt.clone();
    spawn
        .spawn_local_obj(
            Box::pin(poll_fn(move |_| {
                cnt1.set(cnt1.get() + 1);

                let cnt2 = cnt1.clone();
                inner_spawner
                    .spawn_local_obj(Box::pin(lazy(move |_| cnt2.set(cnt2.get() + 1))).into())
                    .unwrap();

                Poll::Pending
            }))
            .into(),
        )
        .unwrap();

    pool.try_run_one();
    assert_eq!(cnt.get(), 2);
}

#[test]
fn run_until_stalled_returns_if_empty() {
    let mut pool = LocalPool::new();
    pool.run_until_stalled();
    pool.run_until_stalled();
}

#[test]
fn run_until_stalled_returns_multiple_times() {
    let mut pool = LocalPool::new();
    let spawn = pool.spawner();
    let cnt = Rc::new(Cell::new(0));

    let cnt1 = cnt.clone();
    spawn.spawn_local_obj(Box::pin(lazy(move |_| cnt1.set(cnt1.get() + 1))).into()).unwrap();
    pool.run_until_stalled();
    assert_eq!(cnt.get(), 1);

    let cnt2 = cnt.clone();
    spawn.spawn_local_obj(Box::pin(lazy(move |_| cnt2.set(cnt2.get() + 1))).into()).unwrap();
    pool.run_until_stalled();
    assert_eq!(cnt.get(), 2);
}

#[test]
fn run_until_stalled_runs_spawned_sub_futures() {
    let mut pool = LocalPool::new();
    let spawn = pool.spawner();
    let cnt = Rc::new(Cell::new(0));

    let inner_spawner = spawn.clone();
    let cnt1 = cnt.clone();
    spawn
        .spawn_local_obj(
            Box::pin(poll_fn(move |_| {
                cnt1.set(cnt1.get() + 1);

                let cnt2 = cnt1.clone();
                inner_spawner
                    .spawn_local_obj(Box::pin(lazy(move |_| cnt2.set(cnt2.get() + 1))).into())
                    .unwrap();

                Poll::Pending
            }))
            .into(),
        )
        .unwrap();

    pool.run_until_stalled();
    assert_eq!(cnt.get(), 2);
}

#[test]
fn run_until_stalled_executes_all_ready() {
    const ITER: usize = if cfg!(miri) { 50 } else { 200 };
    const PER_ITER: usize = 3;

    let cnt = Rc::new(Cell::new(0));

    let mut pool = LocalPool::new();
    let spawn = pool.spawner();

    for i in 0..ITER {
        for _ in 0..PER_ITER {
            spawn.spawn_local_obj(Box::pin(pending()).into()).unwrap();

            let cnt = cnt.clone();
            spawn
                .spawn_local_obj(
                    Box::pin(lazy(move |_| {
                        cnt.set(cnt.get() + 1);
                    }))
                    .into(),
                )
                .unwrap();

            // also add some pending tasks to test if they are ignored
            spawn.spawn_local_obj(Box::pin(pending()).into()).unwrap();
        }
        assert_eq!(cnt.get(), i * PER_ITER);
        pool.run_until_stalled();
        assert_eq!(cnt.get(), (i + 1) * PER_ITER);
    }
}

#[test]
#[should_panic]
fn nesting_run() {
    let mut pool = LocalPool::new();
    let spawn = pool.spawner();

    spawn
        .spawn_obj(
            Box::pin(lazy(|_| {
                let mut pool = LocalPool::new();
                pool.run();
            }))
            .into(),
        )
        .unwrap();

    pool.run();
}

#[test]
#[should_panic]
fn nesting_run_run_until_stalled() {
    let mut pool = LocalPool::new();
    let spawn = pool.spawner();

    spawn
        .spawn_obj(
            Box::pin(lazy(|_| {
                let mut pool = LocalPool::new();
                pool.run_until_stalled();
            }))
            .into(),
        )
        .unwrap();

    pool.run();
}

#[test]
fn tasks_are_scheduled_fairly() {
    let state = Rc::new(RefCell::new([0, 0]));

    struct Spin {
        state: Rc<RefCell<[i32; 2]>>,
        idx: usize,
    }

    impl Future for Spin {
        type Output = ();

        fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
            let mut state = self.state.borrow_mut();

            if self.idx == 0 {
                let diff = state[0] - state[1];

                assert!(diff.abs() <= 1);

                if state[0] >= 50 {
                    return Poll::Ready(());
                }
            }

            state[self.idx] += 1;

            if state[self.idx] >= 100 {
                return Poll::Ready(());
            }

            cx.waker().wake_by_ref();
            Poll::Pending
        }
    }

    let mut pool = LocalPool::new();
    let spawn = pool.spawner();

    spawn.spawn_local_obj(Box::pin(Spin { state: state.clone(), idx: 0 }).into()).unwrap();

    spawn.spawn_local_obj(Box::pin(Spin { state, idx: 1 }).into()).unwrap();

    pool.run();
}

// Tests that the use of park/unpark in user-code has no
// effect on the expected behavior of the executor.
#[test]
fn park_unpark_independence() {
    let mut done = false;

    let future = future::poll_fn(move |cx| {
        if done {
            return Poll::Ready(());
        }
        done = true;
        cx.waker().clone().wake(); // (*)
                                   // some user-code that temporarily parks the thread
        let test = thread::current();
        let latch = Arc::new(AtomicBool::new(false));
        let signal = latch.clone();
        thread::spawn(move || {
            thread::sleep(Duration::from_millis(10));
            signal.store(true, Ordering::SeqCst);
            test.unpark()
        });
        while !latch.load(Ordering::Relaxed) {
            thread::park();
        }
        Poll::Pending // Expect to be called again due to (*).
    });

    futures::executor::block_on(future)
}

struct SelfWaking {
    wakeups_remaining: Rc<RefCell<usize>>,
}

impl Future for SelfWaking {
    type Output = ();

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        if *self.wakeups_remaining.borrow() != 0 {
            *self.wakeups_remaining.borrow_mut() -= 1;
            cx.waker().wake_by_ref();
        }

        Poll::Pending
    }
}

/// Regression test for https://github.com/rust-lang/futures-rs/pull/2593
///
/// The issue was that self-waking futures could cause `run_until_stalled`
/// to exit early, even when progress could still be made.
#[test]
fn self_waking_run_until_stalled() {
    let wakeups_remaining = Rc::new(RefCell::new(10));

    let mut pool = LocalPool::new();
    let spawner = pool.spawner();
    for _ in 0..3 {
        let wakeups_remaining = Rc::clone(&wakeups_remaining);
        spawner.spawn_local(SelfWaking { wakeups_remaining }).unwrap();
    }

    // This should keep polling until there are no more wakeups.
    pool.run_until_stalled();

    assert_eq!(*wakeups_remaining.borrow(), 0);
}

/// Regression test for https://github.com/rust-lang/futures-rs/pull/2593
///
/// The issue was that self-waking futures could cause `try_run_one`
/// to exit early, even when progress could still be made.
#[test]
fn self_waking_try_run_one() {
    let wakeups_remaining = Rc::new(RefCell::new(10));

    let mut pool = LocalPool::new();
    let spawner = pool.spawner();
    for _ in 0..3 {
        let wakeups_remaining = Rc::clone(&wakeups_remaining);
        spawner.spawn_local(SelfWaking { wakeups_remaining }).unwrap();
    }

    spawner.spawn(future::ready(())).unwrap();

    // The `ready` future should complete.
    assert!(pool.try_run_one());

    // The self-waking futures are each polled once.
    assert_eq!(*wakeups_remaining.borrow(), 7);
}
