🌐
Hacker News
news.ycombinator.com › item
Rust does not have to be this hard. Most of the pain here come ...
Most of the pain here come from the unholy trifactor: combining async, lifetimes and dynamic dispatch with trait object closures; which is indeed very awkward in practice · Async support is incredibly half-baked. It was released as an MVP, but that MVP has not improved notably in almost three years
🌐
Reddit
reddit.com › r/rust › trouble using tokio mpsc channel
r/rust on Reddit: Trouble using tokio MPSC channel

Main is treated differently, see https://docs.rs/tokio/latest/tokio/runtime/struct.Runtime.html#non-worker-future

Also, tokio tasks are not the same as threads.

🌐
Stack Overflow
stackoverflow.com › questions › 68961504 › non-blocking-recv-on-tokio-mpsc-receiver
rust - Non-blocking recv on Tokio mpsc Receiver - Stack Overflow

After much hunting I found noop_waker in the futures crate which appears to do what I need in combination with poll_recv:

pub fn start_device_loop(hz: u32, tx: Sender<Option<Keycode>>, mut rx: Receiver<String>) {
    let poll_wait = 1000 / hz;
    let poll_wait = Duration::from_millis(poll_wait as u64);

    tokio::task::spawn_blocking(move || {
        let dev = DeviceState::new();

        let waker = futures::task::noop_waker();
        let mut cx = std::task::Context::from_waker(&waker);

        loop {
            let mut keys = dev.query_keymap();
            match keys.len() {
                0 => (),
                1 => tx.clone().try_send(Some(keys.remove(0))).unwrap(),
                _ => println!("So many keys..."),
            }
            
            match rx.poll_recv(&mut cx) {
                Poll::Ready(cmd) => println!("Command '{}' received.", cmd.unwrap()),
                _ => ()
            };
            std::thread::sleep(poll_wait);
        }
    });
}

After digging through docs and tokio source more I can't find anything that suggests poll_recv is supposed to be an internal-only function or that using it here would have any obvious side effects. Letting the process run at 125hz I'm not seeing any excess resource usage either.


I'm leaving the above code for posterity, but since asking this question the try_recv method has been added to Receivers, making this all much cleaner.

Answer from superstator on stackoverflow.com
🌐
Stack Overflow
stackoverflow.com › questions › 74906944 › why-the-channel-in-the-example-code-of-tokiosyncnotify-is-a-mpsc
rust - Why the channel in the example code of tokio::sync::Notify ...

Quote from the documentation you linked:

If you have two calls to recv and two calls to send in parallel, the following could happen:

  1. Both calls to try_recv return None.
  2. Both new elements are added to the vector.
  3. The notify_one method is called twice, adding only a single permit to the Notify.
  4. Both calls to recv reach the Notified future. One of them consumes the permit, and the other sleeps forever.

Replace try_recv with self.values.lock().unwrap().pop_front() in our case; the rest of the explanation stays identical.

The third point is the important one: Multiple calls to notify_one only result in a single token if no thread is waiting yet. And there is a short time window where it is possible that multiple threads already checked for the existance of an item but aren't waiting yet.

Answer from Finomnis on stackoverflow.com
🌐
Rust-lang
users.rust-lang.org › help
Use `std::sync::mpsc::channel` with `tokio::select!`? - help - ...
hi, I finally solved my previous issues, i.e. send messages in pure sync function inside tokio runtime. Just use tokio::runtime::Handle::spawn_blocking and Sender::blocking_send could work. So this is not necessary now. Answer from linrongbin16 on users.rust-lang.org
🌐
Servo
doc.servo.org › tokio › sync › mpsc › index.html
tokio::sync::mpsc - Rust
A multi-producer, single-consumer queue for sending values between asynchronous tasks.
🌐
Stack Overflow
stackoverflow.com › questions › 78359643 › how-can-i-close-a-tokio-mpsc-channel-on-the-sender-side-with-multiple-senders
rust - How can I close a Tokio mpsc channel on the sender side ...
I am building a simple crawler, the idea was to spawn a new task every time a new URL is found and return all the links found in the page via mspc channel, but I ended up in a standstill. The core ...
🌐
Stack Overflow
stackoverflow.com › questions › 75921449 › using-a-stdsyncmpscchannel-to-send-data-from-an-async-task-to-sync-world
rust - Using a std::sync::mpsc::channel to send data from an async ...

The hard way

Never hold a reference to the channel or the worker between await points. For example, take self instead of &self in your code.

The easy way

Use an async channel. tokio's channels have a blocking_recv() method you can use in synchronous contexts.

Or use a different channel implementation, such as crossbeam's, whose Sender is Sync.

Answer from Chayim Friedman on stackoverflow.com
🌐
Crates
crates.io › crates › crossfire
crates.io: Rust Package Registry
For full functionality of this site it is necessary to enable JavaScript
🌐
Rust-lang
users.rust-lang.org › help
Sharing / Sending Data between Tasks within an Axum Framework - ...
May 8, 2023 - Hi, I am trying to create a micro-service in rust. Currently, I am using Axum. This micro-service actually only take's 1 type of request (a Post request with JSON). However, this service is extremely IO intensive, as in I am web-scraping from 1 particular source which would mean I'd have to ...
🌐
Oida
oida.dev › rust-tokio-guide › channels
Tokio: Channels
This is a long section. Here are some anchors to jump to the respective part:
🌐
Rust-lang
users.rust-lang.org › t › how-to-connect-a-mpsc-tokio-channel-with-a-fallible-stream-consumer › 64256
How to connect a MPSC tokio channel with a fallible stream consumer?
July 10, 2021 - I have a bi-directional stream gRPC endpoint (i.e. GCP pub/sub streaming_pull). The inbound stream is a stream of pub/sub messages and the outbound stream is a stream of ack ids. In my current implementation ack ids are sent to a MPSC tokio channel and I wrap the receiver into a ReceiverStream ...
🌐
Reddit
reddit.com › r/rust › how to connect a mpsc tokio channel with a fallible stream consumer?
r/rust on Reddit: How to connect a MPSC tokio channel with a fallible ...

Although this question did not interest many people, I posted a solution to my own question here. I suspect it may be of interest to someone in the future.

🌐
Docs
docs.rs › tokio › latest › tokio › sync › mpsc › struct.Receiver.html
Receiver in tokio::sync::mpsc - Rust
Receives values from the associated Sender · Instances are created by the channel function
🌐
Stack Overflow
stackoverflow.com › questions › 76828097 › how-to-process-async-jobs-in-queue-in-rust
How to process async jobs in Queue in Rust? - Stack Overflow

Just spawn a single task to do the processing and use channels to communicate with it:

use axum::{extract::Path, routing::get, Router};

use tokio::time::Duration;
use tokio::sync::{mpsc, oneshot}

extern crate diesel;
extern crate tracing;

#[tokio::main]
async fn main() {
    tracing_subscriber::fmt::init();

    // Create a channel to send requests to the processing task
    let (tx, rx) = tokio::sync::channel();
    // Spawn a task to do all the processing. Since this is a single
    // task, all processing will be done sequentially.
    tokio::spawn (async move { process (rx).await; });

    // Pass the tx channel along to the GET handler so that it can
    // send requests to the processing task
    let app = Router::new().route("/sleep/:id", get(move |path| {
        sleep_and_print (path, &tx);
    }));
    let addr = std::net::SocketAddr::from(([0, 0, 0, 0], 3000));
    tracing::info!("Listening on {}", addr);

    axum::Server::bind(&addr)
        .serve(app.into_make_service())
        .await
        .unwrap();
}

async fn process (rx: mpsc::Receiver<(i32, oneshot::Sender<String>)>) 
{
    // Receive the next queued request
    while let Ok ((timer, tx)) = rx.recv().await {
        // Process the request
        start_timer_send_json(timer).await;
        // Send back the result
        if let Err (e) = tx.send (format!("{{\"timer\": {}}}", timer)) {
            println!("{:?}", e);
        }
    }
}

async fn sleep_and_print(
    Path(timer): Path<i32>, 
    tx: &mpsc::Sender<(i32, oneshot::Sender<String>)>) -> String 
{
    // Create a channel to get the result
    let (otx, orx) = oneshot::new();
    // Send our request to the processing task
    tx.send ((timer, otx)).unwrap();
    // Wait for the processing result
    orx.await.unwrap()
}

async fn start_timer_send_json(timer: i32) {
    println!("Start timer {}.", timer);

    tokio::time::sleep(Duration::from_secs(300)).await;

    println!("Timer {} done.", timer);
}
Answer from Jmb on stackoverflow.com
🌐
Reddit
reddit.com › r/rust › new tokio blog post: what's new in axum 0.6.0-rc.1
r/rust on Reddit: New Tokio blog post: What's new in axum 0.6.0-rc.1
August 23, 2022 - 305 votes, 38 comments. 319K subscribers in the rust community. A place for all things related to the Rust programming language—an open-source…
🌐
Reddit
reddit.com › r/rust › new tokio blog post: announcing axum 0.7
r/rust on Reddit: New Tokio blog post: Announcing axum 0.7
November 27, 2023 - 459 votes, 104 comments. 322K subscribers in the rust community. A place for all things related to the Rust programming language—an open-source…