Most of the pain here come from the unholy trifactor: combining async, lifetimes and dynamic dispatch with trait object closures; which is indeed very awkward in practice · Async support is incredibly half-baked. It was released as an MVP, but that MVP has not improved notably in almost three years
Main is treated differently, see https://docs.rs/tokio/latest/tokio/runtime/struct.Runtime.html#non-worker-future

Also, tokio tasks are not the same as threads.

After much hunting I found noop_waker in the futures crate which appears to do what I need in combination with poll_recv:

pub fn start_device_loop(hz: u32, tx: Sender<Option<Keycode>>, mut rx: Receiver<String>) {
    let poll_wait = 1000 / hz;
    let poll_wait = Duration::from_millis(poll_wait as u64);

    tokio::task::spawn_blocking(move || {
        let dev = DeviceState::new();

        let waker = futures::task::noop_waker();
        let mut cx = std::task::Context::from_waker(&waker);

        loop {
            let mut keys = dev.query_keymap();
            match keys.len() {
                0 => (),
                1 => tx.clone().try_send(Some(keys.remove(0))).unwrap(),
                _ => println!("So many keys..."),
            match rx.poll_recv(&mut cx) {
                Poll::Ready(cmd) => println!("Command '{}' received.", cmd.unwrap()),
                _ => ()

After digging through docs and tokio source more I can't find anything that suggests poll_recv is supposed to be an internal-only function or that using it here would have any obvious side effects. Letting the process run at 125hz I'm not seeing any excess resource usage either.

I'm leaving the above code for posterity, but since asking this question the try_recv method has been added to Receivers, making this all much cleaner.

Quote from the documentation you linked:

If you have two calls to recv and two calls to send in parallel, the following could happen:

  1. Both calls to try_recv return None.
  2. Both new elements are added to the vector.
  3. The notify_one method is called twice, adding only a single permit to the Notify.
  4. Both calls to recv reach the Notified future. One of them consumes the permit, and the other sleeps forever.

Replace try_recv with self.values.lock().unwrap().pop_front() in our case; the rest of the explanation stays identical.

The third point is the important one: Multiple calls to notify_one only result in a single token if no thread is waiting yet. And there is a short time window where it is possible that multiple threads already checked for the existance of an item but aren't waiting yet.

Answer from Finomnis on stackoverflow.com
hi, I finally solved my previous issues, i.e. send messages in pure sync function inside tokio runtime. Just use tokio::runtime::Handle::spawn_blocking and Sender::blocking_send could work. So this is not necessary now. Answer from linrongbin16 on users.rust-lang.org
A multi-producer, single-consumer queue for sending values between asynchronous tasks.
I am building a simple crawler, the idea was to spawn a new task every time a new URL is found and return all the links found in the page via mspc channel, but I ended up in a standstill. The core ...
The hard way

Never hold a reference to the channel or the worker between await points. For example, take self instead of &self in your code.

The easy way

Use an async channel. tokio's channels have a blocking_recv() method you can use in synchronous contexts.

Or use a different channel implementation, such as crossbeam's, whose Sender is Sync.

Answer from Chayim Friedman on stackoverflow.com
Although this question did not interest many people, I posted a solution to my own question here. I suspect it may be of interest to someone in the future.

Just spawn a single task to do the processing and use channels to communicate with it:

use axum::{extract::Path, routing::get, Router};

use tokio::time::Duration;
use tokio::sync::{mpsc, oneshot}

extern crate diesel;
extern crate tracing;

async fn main() {

    // Create a channel to send requests to the processing task
    let (tx, rx) = tokio::sync::channel();
    // Spawn a task to do all the processing. Since this is a single
    // task, all processing will be done sequentially.
    tokio::spawn (async move { process (rx).await; });

    // Pass the tx channel along to the GET handler so that it can
    // send requests to the processing task
    let app = Router::new().route("/sleep/:id", get(move |path| {
        sleep_and_print (path, &tx);
    let addr = std::net::SocketAddr::from(([0, 0, 0, 0], 3000));
    tracing::info!("Listening on {}", addr);


async fn process (rx: mpsc::Receiver<(i32, oneshot::Sender<String>)>) 
    // Receive the next queued request
    while let Ok ((timer, tx)) = rx.recv().await {
        // Process the request
        // Send back the result
        if let Err (e) = tx.send (format!("{{\"timer\": {}}}", timer)) {
            println!("{:?}", e);

async fn sleep_and_print(
    Path(timer): Path<i32>, 
    tx: &mpsc::Sender<(i32, oneshot::Sender<String>)>) -> String 
    // Create a channel to get the result
    let (otx, orx) = oneshot::new();
    // Send our request to the processing task
    tx.send ((timer, otx)).unwrap();
    // Wait for the processing result

async fn start_timer_send_json(timer: i32) {
    println!("Start timer {}.", timer);


    println!("Timer {} done.", timer);
Answer from Jmb on stackoverflow.com
