Tutorial: Real-time Chat App in Rust with Rocket ๐Ÿฆ€โŒจ๏ธ

Tutorial: Real-time Chat App in Rust with Rocket ๐Ÿฆ€โŒจ๏ธ

ยท

8 min read

Hello, amazing people and welcome back to my blog! Today we will build a real-time chat application in Rust using the Rocket framework.

In the past, I've written a tutorial on a chat application, but this one is way different and feels more modern. It also includes a UI whereas my previous one was CLI-based.

Introduction

One of the most popular server backend frameworks in Rust is Rocket, and one of the great things about Rocket is the documentation and examples repository, so I was inspired to create this project after checking this repo.

Let's start:

After you create your Rust project, let's open up cargo.toml and add Rocket as a dependency. We'll also add the rand crate as a dev dependency. This is going to be useful later when we implement tests.

[dependencies]
rocket = { features = ["json"] }

[dev-dependencies]
rand = "0.8"

Main.rs

Let's go to main.rs and import rocket. We're importing rocket explicitly with the macro_use attribute so that all the rocket macros are imported globally. This means you can use rocket macros anywhere in your application, which is important because the rocket framework uses macros extensively.

#[macro_use] extern crate rocket;

fn rocket()

After that, the first thing we'll do is to create a state and a rocket server instance.

#[launch]
fn rocket() -> _ {
    rocket::build()
        .manage(channel::<Message>(1024).0)
}

As you see from the code above, the manage method allows us to add state to our rocket server instance, which all handlers have access to. The state we want to add is a channel. Rocket uses Tokyo as an async runtime and channels are a way to pass messages between different async tasks.

Let's add Tokio:

use rocket::tokio::sync::broadcast::{channel, Sender, error::RecvError};

So, back to the fn rocket(), we're creating a channel and specifying what type of messages we'd like to send across the channel. In this case, a Message struct, which we haven't implemented yet. We also pass in a capacity (1024), which is the amount of messages a channel can retain at a given time. The return value of calling the channel function is a tuple containing a sender and receiver end. At the end of this call, we write .0 to get the first element in the tuple because we only want to store the sender end in state.

struct Message

Now that we have our state set up, let's implement the Message struct. The Message struct has three fields: a room name, a username, and a message, all of which are strings. Also, some extra validation is added to room and username. The room name can only be up to 29 characters long and username can only be up to 19 characters long.

#[derive(Debug, Clone, FromForm, Serialize, Deserialize)]
#[cfg_attr(test, derive(PartialEq, UriDisplayQuery))]
#[serde(crate = "rocket::serde")]
struct Message {
    #[field(validate = len(..30))]
    pub room: String,
    #[field(validate = len(..20))]
    pub username: String,
    pub message: String,
}

This struct is also deriving a few traits.

#[derive(Debug, Clone, FromForm, Serialize, Deserialize)]
#[serde(crate = "rocket::serde")]
  • Debug so this struct could be printed out with debug format,

  • clone so we can duplicate messages,

  • FromForm so we can take form data and transform it into a message struct, and

  • serialize and deserialize, which will allow this data structure to be serialized and deserialized. Serialization will happen via crate and the next attribute states that we want to use the crate defined in rocket. #[serde(crate = "rocket::serde")]

The Message struct defines the type of messages we want to send. Our real-time chat application is going to have rooms, users, and messages, so these three fields make sense. Now that we have our message defined, there's only one last thing to do, which is to implement our endpoints. Our chat application needs two endpoints, one endpoint to post messages and another endpoint to receive messages.

Post messages

This route #[post("/message", data = "<form>")] matches against post requests to the message path and accepts form data.

#[post("/message", data = "<form>")]
fn post(form: Form<Message>, queue: &State<Sender<Message>>) {
    // A send 'fails' if there are no active subscribers. That's okay.
    let _res = queue.send(form.into_inner());
}
use rocket::form::Form;

The function handler accepts two arguments, the form data, which is going to be converted to the Message struct, and the server state, which is going to be a sender. Inside the body, we send the message to all receivers queue.send(form.into_inner());

The send method returns a result type because sending a message could fail if there are no receivers. In this project, I don't care about that case, so we're going to ignore it. ๐Ÿ™‚

Receive messages

This route #[get("/events")] handles get requests to the events path. The return type is an infinite stream of server-sent events EventStream. EventStream allow clients to open a long-lived connection with the server, and then the server can send data to the clients whenever it wants. This is similar to WebSockets, except it only works in one direction. The server can send data to clients, but the clients can't send data back to the server.

#[get("/events")]
async fn events(queue: &State<Sender<Message>>, mut end: Shutdown) -> EventStream![] {
    .
    .
}

Unlike the other handler functions we implemented, notice that this function is prefixed with async, that's because server-sent events are produced asynchronously. The handler takes two arguments, queue, which is our server state, and end, which is of type Shutdown. Shutdown is a feature which resolves when our server instance is Shutdown.

Inside the handler, the first thing we do is call queue.subscribe() to create a new receiver. This will allow us to listen for messages when they're sent down the channel. Next, we use generator syntax to yield an infinite series of server-sent events.

#[get("/events")]
async fn events(queue: &State<Sender<Message>>, mut end: Shutdown) -> EventStream![] {
    let mut rx = queue.subscribe();
    EventStream! {
        loop {
            let msg = select! {
                msg = rx.recv() => match msg {
                    Ok(msg) => msg,
                    Err(RecvError::Closed) => break,
                    Err(RecvError::Lagged(_)) => continue,
                },
                _ = &mut end => break,
            };

            yield Event::json(&msg);
        }
    }
}
use rocket::{State, Shutdown};
use rocket::response::stream::{EventStream, Event};
use rocket::tokio::select;

Inside this infinite loop, the first thing we do is use the select! macro. select! waits on multiple concurrent branches and returns as soon as one of them completes. In this case, we only have two branches. The first one is calling receive on our receiver (rx.recv()), which waits for new messages. When we get a new message, we map it to msg and then match against that. recv returns a result enum:

  • If we get the Ok variant, we simply return the message inside of it (Ok(msg) => msg)

  • If we get the error variant and the error is closed (Err(RecvError::Closed)), that means there are no more senders so we can break out of the infinite loop

  • If we get the error variant and the error is lagged (Err(RecvError::Lagged(_)) => continue,), that means our receiver lagged too far behind and was forcibly disconnected. In that case, we simply skip to the next iteration of the loop.

The second branch looks a little odd ( _ = &mut end => break), but what this is doing is waiting for the Shutdown feature to resolve. The Shutdown feature resolves when our server is notified to shutdown, at which point we can break out of this infinite loop.

Assuming we don't hit one of these break or continue statements, the select macro will return the message we got from our receiver, at which point we can yield a new server-sent event, passing in our message (yield Event::json(&msg);).

Yeah! Now both our routes are complete. But we're not yet done! ๐Ÿค“

Finish fn rocket()

The last thing we need to do is mount these routes. Let's go back to fn rocket(). We'll mount post and events to the rootes path.

.mount("/", routes![post, events])

Our backend is complete, but we also need a frontend. Before we start adding HTML files, let's mount a handler that will serve static files.

.mount("/", FileServer::from(relative!("static")))

Your rocket() should look like this:

#[launch]
fn rocket() -> _ {
    rocket::build()
        .manage(channel::<Message>(1024).0)
        .mount("/", routes![post, events])
        .mount("/", FileServer::from(relative!("static")))
}

Front-end

Our static files need to be stored in a folder called static, so I'll create that folder. I'm not going to go into detail about the implementation of the frontend as it's out of the scope of this tutorial, but in the end you'll find the full source code.

As a quick overview, the front end consists of a simple HTML page, a couple CSS files, and a vanilla JavaScript file. The JavaScript file uses the EventSource object to establish a new connection with our server and listen for new messages. When a new message is received, it's parsed into JSON and appended to the DOM.

To send messages, a simple post request is dispatched.

if (STATE.connected) {
      fetch("/message", {
        method: "POST",
        body: new URLSearchParams({ room, username, message }),
      }).then((response) => {
        if (response.ok) messageField.value = "";
      });
    }

Run the App

Hit cargo run into your terminal. Your server will run a localhost similar to this 8000. Open up two web browsers and navigate to your localhost (mine is 8000 ) on both. Then pick two usernames for the field 'user'. Finally, send a message!

If you run this with me, you can see, the message appeared for user 1 and user 2 instantly. One thing to note is that we didn't implement any type of persistence, so if we refresh one of these web pages, all the messages will be lost. You could consider this a bug or a neat little security feature. ๐Ÿ˜

Source code

Find the code here:

Happy Rust Coding! ๐Ÿฆ€


๐Ÿ‘‹ Hello, I'm Eleftheria, Community Manager, developer, public speaker, and content creator.

๐Ÿฅฐ If you liked this article, consider sharing it.

๐Ÿ”— All links | X | LinkedIn

Did you find this article valuable?

Support Eleftheria Batsou by becoming a sponsor. Any amount is appreciated!

ย