Somehow I’m never satisfied when a program only runs on one computer. The reassuring feeling of connectedness, the thrill of discovering who else is there on the network, the fear of network instability and the insanity of trying to establish a coherent view of the world on multiple machines are both a source of joy and despair that keep me hooked. But I digress…

In the previous article, we looked at building a blinking Raspberry PI with Rust to help cat owners think about cleaning the cat litter box (and avoid death stares from their feline companions). In this article, we’ll take things a step further and expand the Reminder PIs to a network, allowing to spread them all over the house so that forgetting about this daily cat owner duty will become an almost impossible thing to do. In order to achieve this, we will embrace the principles of Fearless Concurrency made possible by Rust’s memory management paradigm. More specifically, we will:

  • have a look at the design of the networked system
  • explore the message passing paradigm between threads with channels
  • look at shared ownership between threads for graceful shutdown
  • reflect on the design decisions, the tradeoffs of the approach and the meaning of life

Cluster of reminder PIs, ready to be dispatched everywhere in the house

Let’s go!

Designing a simple clustered application

Any distributed system needs the following building blocks in order to operate:

  • discovery: a way to discover other member nodes
  • transport: a way to communicate (the pipes)
  • protocol: a common language to understand each other
  • consensus: a way to agree on the state of things

At first, this definition may look somewhat elaborate, so let’s take some time and talk about these different aspects to demystify them.

Discovery

In order to talk to other nodes on the network, we need to know about them. There can be various degrees of discovery in a distributed system, ranging from a simple, hard-coded list of IP addresses on each node to full-fledged group membership (the foundation of clustered applications). If you want to learn more about this, you can have a look at this Pink Floyd themed talk on group membership.

For this particular system, we’ll keep things simple, but not as simple as having to hard-code the IP addresses. Since I’m using WIFI with a DHCP server and without static leases, this would be a tedious exercise. Fortunately there’s a relatively simple way for dynamic service discovery on local networks: mDNS with DNS Service Discovery. Each node will simply advertise the cat reminder service on the local network and also continuously look for nodes that offer this service.

As it so happens, There’s A Rust Crate For That: the mdns-sd crate, which happens to implement this type of discovery and doesn’t depend on async/await (which for the purpose of this article is exactly what we need).

Transport

For the nodes to exchange data we need a transport medium - the basic plumbing underpinning any kind of networked application. Since we’re fearless (and quite frankly our system isn’t a very high stake system) we’ll use the UDP protocol. Each node will act both as server (to receive and respond to requests) and client (to request the latest state when starting up and to inform other nodes that the state has changed locally).

At this point, we could roll our own socket initialization and listening loops. Or we could use the message-io crate which provides a simple abstraction that allows to focus on messages and sockets.

Protocol

For the nodes to talk to one another, we need to define a common language, that is to say:

  • a set of messages that make up the language
  • a technical representation that can cross network boundaries

In this application, there’s really only two messages that we need: one in order to ask for the current version of the state (the timestamp) and another one to tell others that the state has changed. Using the serde crate, the messages look like this:

1
2
3
4
5
6
7
8
9
use chrono::{DateTime, Utc};
use serde::{Serialize, Deserialize};
use chrono::serde::ts_seconds_option;

#[derive(Serialize, Deserialize)]
pub enum Message {
    RequestState,
    UpdateState(#[serde(with = "ts_seconds_option")] Option<DateTime<Utc>>)
}

Further, we’ll use the bincode crate as encoder / decoder implementation.

Consensus

After failure detection, consensus is one of my favorite subject matters in the distributed systems field. It certainly helps with the slow and steady decline into insanity.

That being said, in this application, the state is ideally simple. We only need to agree on a timestamp. Time only moves forward (it increases monotonically). Well, except in that one corner in my basement where it moves backwards, and it’s generally not a good idea to stay in there for too long because you may then be crossing your future self on the stairs back up later on, which is awkward. But for our application, we can safely assume that any new version of the state will be the right one. Problem solved!

Putting it all together

In summary, the networked version of the Reminder application will have the following modules, each spawning a separate thread:

Reminder application modules and their interactions

As you can see, some of the modules need to communicate with one another. Let’s see how we can achieve this with one of the concurrency primitives Rust offers: channels.

Passing messages between threads with channels

Channels make it possible for threads to communicate by message passing. You can think of channels as a unidirectional pipe that has two ends: one that accepts messages to be sent and one that receives them.

A channel between two threads, with the Sender on one side and the Receiver on the other side

When the reset button is pressed in the main module, we want this information to be shared with the transport module. Similarly, when the reset button in pressed on another node, this information will need to be sent from the transport module to the reminder module.

We need to define the messages to be sent between the modules. Since we’re talking about things that have happened in the past, we’ll define these as events, like so:

1
2
3
4
5
6
7
pub enum TransportEvent {
    CleaningTimeReset(DateTime<Utc>)
}

pub enum ReminderEvent {
    CleaningTimeUpdated(DateTime<Utc>)
}

We can then proceed to creating the channels we need: one to send information to the reminder module and one to send information to the transport module:

1
2
3
4
5
6
use use std::sync::mpsc::Sender;
use use std::sync::mpsc::Receiver;


let (reminder_tx: Sender<ReminderEvent>, reminder_rx: Receiver<ReminderEvent>) = mpsc::channel();
let (transport_tx: Sender<TransportEvent>, transport_rx: Receiver<TransportEvent>) = mpsc::channel();

When initializing the two modules, we can then pass the Sender and Receiver as needed:

1
2
discovery::run(ip_addr, 5200, transport_tx.clone());
transport::run(ip_addr, 5300, reminder_tx, transport_rx, last_cleaning_time);

When the state is being reset in the reminder module we can then proceed to sending this information over to the transport module:

1
2
3
4
5
6
7
8
fn reset_state_if_button_pushed(&mut self) {
    let button_pushed = self.read_button_state().unwrap();
    if button_pushed {
        self.last_cleaning_time = crate::reset_state();
        self.transport_tx.send(TransportEvent::CleaningTimeReset(self.last_cleaning_time))
                         .expect("Could not send updated state to transport module");
    }
}

In the transport module, we can receive incoming messages using the try_recv() method without blocking like so:

1
2
3
4
5
6
7
// inside of a loop
if let Ok(msg) = rx.try_recv() {
    match msg {
        TransportEvent::CleaningTimeReset(new_time) => {
            // ... broadcast this information to the other nodes
        }
}

Channels can have multiple senders

One more thing: channels can have multiple senders. You may have noticed that when initializing the discovery module earlier on, we also cloned the transport_tx sender. This is because we also need to pass this sender to the reminder module later on, which will take ownership of the (original) sender.

Using shared ownership for graceful shutdown

Rust offers another way for multiple threads to exchange information: shared state. In many programming languages, mixing shared mutable state and threads can lead to unspeakable abominations be difficult to work with because of all kinds of potential concurrency bugs. However, Rust’s ownership model happens to help avoid many of these concurrency issues at compile-time rather than at runtime.

One issue with our multi-threaded application is that when it is stopped, its threads are simply terminated, like that. In cold blood. Without the time to say goodbye unwind any in-flight processes. For example, the LED strips will still be on after the application has been stopped.

In order to fix this, we’ll make sure that we have a way to unwind things when one of the system signals SIGINT, SIGTERM or SIGQUIT are received. The signal-hook crate helps with achieving this. It allows to register the handling of signals via the register function:

1
2
3
pub fn register(signal: c_int, flag: Arc<AtomicBool>) -> Result<SigId, Error> {
// ...
}

What’s interesting in this signature is the flag we need to provide (which will be set to true when a signal has been received): the Arc<AtomicBool>. Let’s dissect that.

Concurrency primitives for multithreaded access

An AtomicBool is a concurrency primitive which allows to read or write a boolean in a thread-safe manner. Atomics aren’t anything specific to Rust, they exist in many other programming languages and are a way to manipulate variables safely from multiple threads. To learn (a lot) more about atomics in Rust check out Rust Atomics and Locks.

An Arc<T> is an Atomic Reference Counter. It is a smart pointer that allows to track references to the same value in a multithreaded environment. But why do we need this? Rust’s borrowing rules state that there can be only one owner of a value at the same time. However in our current case we need the AtomicBool to be owned in multiple places:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
let shutdown_flag = AtomicBool::new(false);
signal_hook::flag::register(signal_hook::consts::SIGTERM, shutdown_flag).unwrap(); <-- here
signal_hook::flag::register(signal_hook::consts::SIGINT, shutdown_flag).unwrap(); <-- here
signal_hook::flag::register(signal_hook::consts::SIGQUIT, shutdown_flag).unwrap(); <-- here

discovery::run(ip_addr, 5200, transport_tx.clone(), shutdown_flag); <-- here
transport::run(ip_addr, 5300, reminder_tx, transport_rx, last_cleaning_time, shutdown_flag); <-- here

let mut reminder = Reminder { chip, controller, reminder_rx, transport_tx, last_cleaning_time, is_strip_on: false };
reminder.run(shutdown_flag); <-- here

The code above would not compile. On line 2 we pass the ownership of the shutdown_flag to the first call to register, so any subsequent uses will not be allowed by the compiler as we’d be attempting to use a value that has been moved.

At this point, if this were a single-threaded environment, we could be using a simple reference counting smart pointer Rc<T>. This smart pointer makes it possible to have multiple owners for the same (wrapped) value. It works by counting how many references there are to the value, so that when the reference count drops to zero, the value can be safely removed from memory.

Given that both the discovery and the transport module spawn a new thread, we need a thread-safe alternative, which Arc<AtomicBool> provides.

Our code becomes:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
let shutdown_flag = Arc::new(AtomicBool::new(false));
signal_hook::flag::register(signal_hook::consts::SIGTERM, shutdown_flag.clone()).unwrap();
signal_hook::flag::register(signal_hook::consts::SIGINT, shutdown_flag.clone()).unwrap();
signal_hook::flag::register(signal_hook::consts::SIGQUIT, shutdown_flag.clone()).unwrap();

discovery::run(ip_addr, 5200, transport_tx.clone(), shutdown_flag.clone());
transport::run(ip_addr, 5300, reminder_tx, transport_rx, last_cleaning_time, shutdown_flag.clone());

let mut reminder = Reminder { chip, controller, reminder_rx, transport_tx, last_cleaning_time, is_strip_on: false };
reminder.run(shutdown_flag.clone());

Note how the shutdown_flag is being cloned() in all of its uses. Cloning causes the reference count to be increased.

In summary, in order to share a boolean value safely across multiple threads, we use:

  • an AtomicBool to be able to set and read the value in a thread-safe manner
  • and wrap it in an atomic reference counting pointer Arc<AtomicBool> in order to allow multiple ownership across threads

A few thoughts

Building this clustered application was quite fun - I was looking forward to reusing the Raspberry PI cluster used previously for various cluster demonstrations.

I had a first go at building a clustered version of this project a year ago and back then I opted for using an actor library (Rust has many of those). However, I abandoned this approach, partly because I ran out of time and partly because the actor libraries I tried did not play well together with async methods. This time around I opted for using the primitives built into the language, and it turns out that this made the development of this project quite easy (less than one day of work in total). I think that especially for systems with a small amount of threads / modules, channels are an excellent solution. With a large amount of concurrently running modules that need to communicate with one another, another paradigm would probably be required because passing the channels around could become cumbersome, and having multiple receivers for the same message could also be necessary, which is something that channels don’t support.

Enabling multiple ownership of a (thread-safe) primitive with Arc<T> is a very powerful tool that the language offers. I think that especially for code that doesn’t have frequent concurrent access patterns (or doesn’t have high performance requirements), this approach allows for a simple and safe way to exchange information between threads.

That’s it for now. Thanks for reading until here!

As usual, the source code of this project is available on GitHub.