🌐
Rust-lang
users.rust-lang.org › help
Making a web server - help - The Rust Programming Language Forum
November 12, 2021 - I want to make a web server using Rust, and am looking around for a suitable crate or crates to help out with the job. I am moderately familiar with the "Book" example. Things I think I need: Support for https. http request parsing. Multi-tasking ( to allow multiple requests to be handled at ...
🌐
Stack Overflow
stackoverflow.com › questions › 69494623 › tokio-mpsc-channel-doesnt-work-across-tasks
rust - Tokio mpsc channel doesn't work across tasks - Stack Overflow

Oooo actually I was wrong. The issue is that when using the task the watcher gets dropped immediately, whereas without the task it is kept alive by the while loop.

Answer from Timmmm on stackoverflow.com
🌐
Tokio
tokio.rs › tokio › tutorial › shared-state
Shared state | Tokio - An asynchronous Rust runtime
Tokio is a runtime for writing reliable asynchronous applications with Rust. It provides async I/O, networking, scheduling, timers, and more.
🌐
Stack Overflow
stackoverflow.com › questions › 53905328 › how-to-close-a-modified-and-executing-futuressyncmpscreceiver-stream
rust - How to close a modified and executing `futures::sync::m...

Not going to lie, I'm with @shepmaster on this one, your question is pretty unclear. That said, it feels like you're trying to do something the mpsc part of futures isn't geared to do.

Anyway. Explanation time.

Whenever you combine/compose streams (or futures!), every single composition method takes self, not &self or &mut self as I think you might have hoped.

The moment you get to this code block of yours:

    {
        let mut maybe_stream = arc.lock().unwrap();
        let stream = maybe_stream.take().expect("Stream already ripped out"); // line "B"

        let rx = stream.for_each(|_| Ok(()));
        tokio::spawn(rx);
    }

...The stream is extracted from the Arc<Option<Receiver<T>>> when you take() it, and the content of it is replaced by None. You then spawn it on the Tokio reactor, which starts processing this part. This rx is now on the loop, and no longer available to you. Additionally, your maybe_stream now contains None.

After a delay, you then try to take() the content of the Arc<Option<Receiver<T>>> (line A). Since there's now nothing left, you're left with nothing, and therefore there is nothing left to close. Your code errors out.

Instead of passing around a mpsc::Receiver and hoping to destroy it, use a mechanism to stop the stream itself. You can do so yourself or you can use a crate like stream-cancel to do so for you.

The DIY version is here, modified from your code:

extern crate futures;
extern crate tokio;

use futures::future::lazy;
use futures::{future, Future, Sink, Stream};
use std::sync::{Arc, RwLock};
use std::sync::atomic::{Ordering, AtomicBool};
use tokio::timer::{Delay, Interval};

fn main() {
    tokio::run(lazy(|| {
        let (tx, rx) = futures::sync::mpsc::channel(1000);

        let circuit_breaker:Arc<AtomicBool> = Arc::new(AtomicBool::new(false));
        let c_b_copy = Arc::clone(&circuit_breaker);
        tokio::spawn(
            Delay::new(std::time::Instant::now() + std::time::Duration::from_secs(1))
                .map_err(|e| eprintln!("Some delay err {:?}", e))
                .and_then(move |_| {
                    // We set the CB to true in order to stop processing of the stream
                    circuit_breaker.store(true, Ordering::Relaxed);
                    Ok(())
                }),
        );

        {
            let rx2 = rx.for_each(|e| {
                println!("{:?}", e);
                Ok(())
            });
            tokio::spawn(rx2);
        }

        tokio::spawn(
            Interval::new_interval(std::time::Duration::from_millis(100))
                .take(100)
                // take_while causes the stream to continue as long as its argument returns a future resolving to true.
                // In this case, we're checking every time if the circuit-breaker we've introduced is false
                .take_while(move |_| {
                    future::ok(
                        c_b_copy.load(Ordering::Relaxed) == false
                    );
                })
                .map_err(|e| {
                    eprintln!("Interval error?! {:?}", e);
                })
                .fold((tx, 0), |(tx, i), _| {
                    tx.send(i as u32)
                        .map_err(|e| eprintln!("Send error?! {:?}", e))
                        .map(move |tx| (tx, i + 1))
                })
                .map(|_| ()),
        );

        Ok(())
    }));
}

Playground

The added take_while() allows you to operate on either the stream's content, or an outside predicate, to continue or to stop a stream. Note that even though we're using an AtomicBool, we still need the Arc due to 'static lifetime requirements from Tokio.

Reversing the flow

After some discussion in the comments, this solution may be more suited for your use case. I effectively implemented a fan-out stream covered by a circuit breaker. The magic happens here:

impl<S> Stream for FanOut<S> where S:Stream, S::Item:Clone {

    type Item = S::Item;

    type Error = S::Error;

    fn poll(&mut self) -> Result<Async<Option<S::Item>>, S::Error> {
        match self.inner.as_mut() {
            Some(ref mut r) => {
                let mut breaker = self.breaker.write().expect("Poisoned lock");
                match breaker.status {
                    false => {
                        let item = r.poll();
                        match &item {
                            &Ok(Async::Ready(Some(ref i))) => {
                                breaker.registry.iter_mut().for_each(|sender| {
                                    sender.try_send(i.clone()).expect("Dead channel");
                                });
                                item
                            },
                            _ => item
                        }
                    },
                    true => Ok(Async::Ready(None))
                }
            }
            _ => {

                let mut breaker = self.breaker.write().expect("Poisoned lock");
                // Stream is over, drop all the senders

                breaker.registry = vec![];
                Ok(Async::Ready(None))
            }
        }
    }
}

If the status indicator is set to false, the above stream is polled; the result is then sent to all listeners. If the result of the poll is Async::Ready(None) (indicating that the stream is finished), all listener channels are closed.

If the status indicator is set to true, all listener channels are closed, and the stream returns Async::Ready(None) (and is dropped from execution by Tokio).

The FanOut object is cloneable, but only the initial instance will do anything.

Answer from Sébastien Renauld on stackoverflow.com
🌐
Reddit
reddit.com › r/rust › hello world server - rust, tokio & axum
r/rust on Reddit: Hello World server - Rust, Tokio & Axum
December 18, 2023 - 294K subscribers in the rust community. A place for all things related to the Rust programming language—an open-source systems language that…
🌐
Docs
docs.rs › axum
axum - Rust
axum is a web application framework that focuses on ergonomics and modularity.
🌐
Reddit
reddit.com › r/rust › using rust, axum, postgresql, and tokio to build a blog
r/rust on Reddit: Using Rust, Axum, PostgreSQL, and Tokio to build ...

I had seen this example on axum.rs Chinese website...have u copied some things? Wink wink

🌐
Hermanradtke
hermanradtke.com › 2017 › 03 › 03 › future-mpsc-queue-with-tokio.html
Future Based mpsc Queue Example with Tokio
March 3, 2017 - It has some subtle differences from the mpsc queue in the std library. I spent some time reading the documentation on https://tokio.rs/, a lot of source code and finally ended up writing a small example program. I have written a decent amount of inline comments with my understanding of how this all works. A complete working example can be found here. I wrote this using Rust ...
🌐
Juxhin
blog.digital-horror.com › blog › how-to-avoid-over-reliance-on-mpsc
Avoiding Over-Reliance on `mpsc` channels in Rust - Juxhin ("Eugene") ...
Thoughts and ideas on how to avoid over relying on `mspc` channels when simpler or more effective patterns can be used.
🌐
Rust-lang
users.rust-lang.org › help
Tokio + std::sync::io::mpsc - future cannot be sent between threads ...
November 8, 2022 - Help... What is this error trying to tell me..? Compiling rust_playground_2022-11-07 v0.1.0 (/home/user/Development/rust_playground_2022-11-07) error: future cannot be sent between threads safely --> src/main.rs:1…
🌐
GitHub
github.com › tokio-rs › axum › discussions › 2381
Axum 0.7.1 uses the CPU more intensively · tokio-rs/axum · ...
Hello! I updated axum from version 0.6.20 to 0.7.1. According to my metrics, the load on the CPU has increased significantly, although the load on the application has remained the same (300-500 rps...
Author: tokio-rs
🌐
Cybernetist
cybernetist.com › home › rust tokio task cancellation patterns
Rust tokio task cancellation patterns - Cybernetist
April 19, 2024 - Update: 19/04/2024: read at the end of the post for more info. I have been trying to pick up Rust again recently. It’s been a bit of a slow burn at the beginning but I think I’m finally starting to feel the compounding effects kicking in. Maybe it’s just my brain playing a trick on me, ...
🌐
Lib
lib.rs › asynchronous
Asynchronous — list of Rust libraries/crates // Lib.rs
Async program flow using techniques like futures, promises, waiting, or eventing.
🌐
Rust-classes
rust-classes.com › chapter_5_4
Importing Crates - Rust Development Classes
Now that you know how to organize your Rust project. Let's look at how 3rd party Crates can be added to the application. Dependencies are, amongst other things, managed By the Cargo.toml file · This is the Cargo.toml file that was generated for our HelloWorld example:
🌐
Rustrepo
rustrepo.com › repo › tokio-rs-axum
Ergonomic and modular web framework built with Tokio, Tower, and ...
tokio-rs/axum, axum axum is a web application framework that focuses on ergonomics and modularity. More information about this crate can be found in the crate docume