zbus and Implementing Async Rust API

zbus

As many of the readers already know, for the past (almost) 2 years, I've been developing a Rust-crate for D-Bus, called zbus. With this being my first big Rust project to start from scratch done almost entirely in my spare time, the progress was rather slow for many months. My perfectionism didn't help much with the progress either but also the fact that implementing a Serde API for the D-Bus format was quite challenging. Then along came my old friend and ex-colleague, Marc-André Lureau who sped up the progress 10 times and soon after we had the 1.0 release of zbus.

While my original plan (perfectionism again) was for the API to be primarily async, with the synchronous API mostly just a wrapper around it, it was Marc-Andre who ended up doing most of the work and coming up with nice high-level API and his use case was primarily synchronous so we decided to go with synchronous API first. I still believe that was the right thing to do, since neither of us were familiar with async programming in Rust and going with the original plan would have meant the first release getting delayed by at least another half an year.

This may sound very disappointing to readers who come from glib programming background but a purely synchronous blocking API in a Rust app is not at all as bad it would be in a glib+C (or even Vala) app. There is a reason why Rust is famous for its fearless concurrency.

Asynchronous API

However, a first class asynchronous API is still a must if we're serious about our goal of making D-Bus super easy. This is especially important for UI apps, that should have an easy way to communicate over D-Bus w/o blocking the UI and having to spawn threads and setting up communication channels between these threads etc.

So for the past many weeks, I've been working on adding async versions of our synchronous types, starting from the low-level Connection, followed by the client-side proxy and hopefully soon the service-side as well. It's been a very interesting challenge to say the least. Coming from Vala background, Rust's async/await syntax felt very familiar to me.

One of the great thing is that I was able to achieve one of my original goals and turned our existing types into thin blocking wrappers around their new async siblings, hence avoiding code duplication. Moreover, thanks to the futures and smol-rs crates, so far I've also been able to keep zbus agnostic of specific async runtimes as well.

Esther loves async Rust code

Pain Points

Having said that, while Rust's async/await is a lot of joy from a user's POV, implementing useful async API on top isn't a walk in the park. Here are some of the hurdles I bumped into:

Pinning

If you're going to be doing any async programming in Rust, you'll sooner or later have to learn what this is. It's not at all a hard concept but what I found especially challenging is the difference between the Unpin and !Unpin types. i-e which one is which? I also kept forgetting why I need to pin a future before awaiting on it.

Implementing Futures based on async API

This is something I found very surprising. I would have thought that would be easy but turns out it's not. The reason is the argument you receive in the Future's required poll method:

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;

Notice, how the first argument here is Pin<&mut Self> and not the usual self, &self or &mut self. This is very much justified since we can't have things moving around while a future is not complete (for which the poll method will potentially be called many times). However, what an async function or block gives you is an abstract type that implements Future so you can't just do something like:

struct MyFuture;

impl Future for MyFuture {
    type Output = String;

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        match some_async_method_returning_string().poll(cx) {
            Poll::Ready(s) => Poll::Ready(format!("string: {}", s)),
            Poll::Pending => Poll::Pending,
        }
    }
}

You'll get:

error[E0599]: no method named `poll` found for opaque type `impl Future` in the current scope
  --> src/lib.rs:13:52
   |
13 |         match some_async_method_returning_string().poll(cx) {
   |                                                    ^^^^ method not found in `impl Future`

While I still don't know how to turn an async call into a Pin<&mut T>, I was informed about how futures crate provides enough API for me not having to do that, e.g FutureExt::map. There is also API to convert futures into streams and vice versa. But I had to ask around to figure that out and it wasn't very obvious to me.

Async closures

Firstly, there are no async closures in Rust yet and from what I hear it'll be a long time before they'll be available in stable Rust. In the meantime, a common workaround is for closures to return a Future:

async fn call_async_cb<F, Fut>(func: F) -> String
where
    F: Fn() -> Fut,
    Fut: Future<Output = String>,
{
    func().await
}

async fn pass_async_cb() {
    let s = call_async_cb(|| 
        async {
            some_async_method_returning_string().await
        }
    ).await;

    println!("{}", s);
}

As you can see, for simple cases like in the sample code above, the code isn't very different from how it would look like if async closure were a thing. But let's take a slightly more complicated example, in the sense that the callback takes a reference as an argument:

async fn call_async_cb<F, Fut>(func: F) -> String
where
    F: Fn(&str) -> Fut,
    Fut: Future<Output = String>,
{
    let w = "world".to_string();
    func(&w).await
}

async fn pass_async_cb() {
    let s = call_async_cb(|w|
        // Also notice the `move` here. W/o it we get another error from the compiler.
        async move {
            let s = some_async_method_returning_string().await;

            format!("{} {}", s, w)
        }
    ).await;

    println!("{}", s);
}

which will result in:

error: lifetime may not live long enough
  --> src/main.rs:19:9
   |
18 |       let s = call_async_cb(|w|
   |                              -- return type of closure `impl Future` contains a lifetime `'2`
   |                              |
   |                              has type `&'1 str`
19 | /         async move {
20 | |             let s = some_async_method_returning_string().await;
21 | |
22 | |             format!("{} {}", s, w)
23 | |         }
   | |_________^ returning this value requires that `'1` must outlive `'2

The solution that I could find for this problem was to pass only owned data to such closures. This solution isn't ideal at all. A signal handler should really not be getting a Message but only a reference to it. A simple solution here would be to pass Arc<Message> instead but that would require the entire call chain to be converted to use that type instead. We'll likely be doing exactly that but it would'd have been nice not having to do that.

Errors from hell

At the time of this writing, the async Proxy API in zbus is slightly broken and I only found out about it after I tried to use it with tokio::select in our company's internal codebase:

98  |         Ok(tokio::spawn(async move {
    |            ^^^^^^^^^^^^ `dyn FnMut(zbus::Message) -> Pin<Box<dyn futures::Future<Output = std::result::Result<(), zbus::Error>> + std::marker::Send>> + std::marker::Send` cannot be shared between threads safely
    | 
   ::: /home/zeenix/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.0.1/src/task/spawn.rs:128:21
    |
128 |         T: Future + Send + 'static,
    |                     ---- required by this bound in `tokio::spawn`
    |
    = help: the trait `Sync` is not implemented for `dyn FnMut(zbus::Message) -> Pin<Box<dyn futures::Future<Output = std::result::Result<(), zbus::Error>> + std::marker::Send>> + std::marker::Send`
    = note: required because of the requirements on the impl of `Sync` for `std::ptr::Unique<dyn FnMut(zbus::Message) -> Pin<Box<dyn futures::Future<Output = std::result::Result<(), zbus::Error>> + std::marker::Send>> + std::marker::Send>`
    = note: required because it appears within the type `Box<dyn FnMut(zbus::Message) -> Pin<Box<dyn futures::Future<Output = std::result::Result<(), zbus::Error>> + std::marker::Send>> + std::marker::Send>`
    = note: required because it appears within the type `(&str, Box<dyn FnMut(zbus::Message) -> Pin<Box<dyn futures::Future<Output = std::result::Result<(), zbus::Error>> + std::marker::Send>> + std::marker::Send>)`
    = note: required because of the requirements on the impl of `Sync` for `hashbrown::raw::RawTable<(&str, Box<dyn FnMut(zbus::Message) -> Pin<Box<dyn futures::Future<Output = std::result::Result<(), zbus::Error>> + std::marker::Send>> + std::marker::Send>)>`
    = note: required because it appears within the type `hashbrown::map::HashMap<&str, Box<dyn FnMut(zbus::Message) -> Pin<Box<dyn futures::Future<Output = std::result::Result<(), zbus::Error>> + std::marker::Send>> + std::marker::Send>, RandomState>`
    = note: required because it appears within the type `HashMap<&str, Box<dyn FnMut(zbus::Message) -> Pin<Box<dyn futures::Future<Output = std::result::Result<(), zbus::Error>> + std::marker::Send>> + std::marker::Send>>`
    = note: required because of the requirements on the impl of `Sync` for `async_lock::mutex::MutexGuard<'_, HashMap<&str, Box<dyn FnMut(zbus::Message) -> Pin<Box<dyn futures::Future<Output = std::result::Result<(), zbus::Error>> + std::marker::Send>> + std::marker::Send>>>`
    = note: required because of the requirements on the impl of `std::marker::Send` for `&async_lock::mutex::MutexGuard<'_, HashMap<&str, Box<dyn FnMut(zbus::Message) -> Pin<Box<dyn futures::Future<Output = std::result::Result<(), zbus::Error>> + std::marker::Send>> + std::marker::Send>>>`

    ***MORE SCARY INFO HERE***

    = note: required because it appears within the type `impl futures::Future`

It's not even immediately obvious if the issue is lack of Send or Sync. After some hints from Sebastian Dröge (who btw, has been very helpful during this whole endeavour), it turns out the culprit is this line, where we gotta store a &HashMap across a .await boundary. Don't worry if you don't get it, I'm not sure I fully comprehend this either. :)

Most importantly, the fact that the issue is in zbus code but the error is only revealed on trying to build the using code, makes it very unexpected. It is a bit contrary to the usual experience with Rust. "If it builds, it works" is not just an empty slogan but in most cases actually (surprisingly) true.

Conclusion

I still find this experience much more pleasant compared to doing the same in Vala or C (especially C). You'll not encounter any of these pains in those languages, sure but you'll have a lot more real problem at runtime to deal with for years that are implications of the problems Rust want you to confront at build time. But let's not digress.. This is not supposed to be yet another Rust sermon from yours truly. :)

Can Rust do better here? For sure! That is the hope and the reason for this blog post.

Comments

Popular posts from this blog

Welcome to the virtual world!

clutter-gst