ZeroMQ Communication Between Python And Rust
programming, rust, python
I wanted to get a simple message passing going on between Python and Rust. One that would be quick and easy to set up and learn (I’ve never done anything like that before). In the end, I picked up ZeroMQ and it was pretty neat.
Here’s what I wanted to happen: have a Rust “server” that receives messages from a single Python “client”, responds to the client after a bit of a delay and crashes randomly.
The client would send messages to the server, receive the responses and figure out when a server has crashed.
And when you start them in either order, they wait until the other part is connected before starting sending/receiving messages.
Installing ZeroMQ
Unfortunately (though it makes sense), you need to install the underlying ZeroMQ library (as opposed to say using the Unix sockets or TCP directly where you can rely on your OS/standard library).
On Fedora 24 it’s:
sudo dnf install zeromq-devel
The server in Python
I wrote both the server and client in Python first because it felt easier than juggling two separate languages and bindings in addition to learning ZeroMQ at the same time.
It turns out to be quite straightforward.
The ZeroMQ page recommends the pyzmq
library so I did that:
pip install --user pyzmq
And then the server is just:
#!/usr/bin/env python
import random
import time
import zmq
if __name__ == '__main__':
context = zmq.Context()
socket = context.socket(zmq.REP)
socket.bind("ipc:///tmp/dose-response.ipc")
handshake = socket.recv()
if handshake == 'READY':
socket.send('READY')
while True:
message = socket.recv()
# Crash 10% of the time
if random.random() < 0.1:
assert False, "Oh Noes!"
time.sleep(0.016)
socket.send("Hello World!")
if "quit" in message:
break
socket.close()
context.term()
(regular readers might notice the subtle hints and wonder whether this is related to Dose Response. It is.)
We set up a ZerqMQ response socket (zmq.REP
) and bind it to a unix socket at /tmp/dose-response.ipc
. You could easily switch to TCP by passing something like "tcp://*:25834"
to socket.bind
or "inproc://whatever"
for communication within a single process.
Then we do a highly sophisticated handshake where the client sends READY
and the server responds with READY
.
socket.recv
blocks, waiting for the next message.
Next we loop, waiting for messages from the client, stalling for 16 milliseconds, responding back and crashing randomly (as servers do).
If we receive quit
, we shut down cleanly.
The client in Python
The client is a little more complicated, but not much. It mostly stems from having to detect a server crash.
#!/usr/bin/env python
import time
import zmq
REQUEST_TIMEOUT_MS = 3000
if __name__ == '__main__':
context = zmq.Context()
socket = context.socket(zmq.REQ)
socket.connect("ipc:///tmp/dose-response.ipc")
poll = zmq.Poller()
poll.register(socket, zmq.POLLIN)
print "Sending handshake"
socket.send('READY')
handshake = socket.recv()
if handshake == 'READY':
print "Connected to the server"
commands = [str(i) for i in range(10)]
commands.append("quit")
for command in commands:
print "Sending command {}".format(command)
socket.send(command)
socks = dict(poll.poll(REQUEST_TIMEOUT_MS))
if socks.get(socket) == zmq.POLLIN:
message = socket.recv()
print "Received reply: {}".format(message)
else:
print "ERROR: Timed out waiting for a response"
break
else:
print "Unexpected handshake reply: {}".format(handshake)
socket.close()
context.term()
The setup is basically identical, but the socket is of type zmq.REQ
because this code is doing the requesting.
As far as I could find, there’s no built-in way to figure out whether the connection was closed. So we want to set a timeout on the server replies and treat that as a closed connection.
Now, socket.recv
blocks and there’s no way to do something like socket.recv(timeout=…)
. We have do a little setup to support polling.
This involves creating a Poller
, letting it know about our socket and polling to see whether the connection is still active.
So instead of a simple socket.recv()
, we do:
socks = dict(poll.poll(timeout_in_milliseconds))
if socks.get(socket) == zqm.POLLIN:
response = socket.recv()
else:
pass # We timed out
they see me POLLIN, they hatin’
Other than that, it pretty much follows the server code: establish the ZeroMQ connection, perform the handshake and then send messages and wait for the responses.
The server in Rust
In Rust, we can use the [zmq][zqm] crate. Let’s set up the project:
$ cargo new --bin ipc_server
$ cd ipc_server
$ cargo run
Compiling ipc_server v0.1.0 (file:///home/thomas/tmp/ipc_server)
Finished debug [unoptimized + debuginfo] target(s) in 0.22 secs
Running `target/debug/ipc_server`
Hello, world!
edit Cargo.toml
:
[package] name = "ipc_server" version = "0.1.0" authors = ["Tomas Sedovic <tomas@sedovic.cz>"] [dependencies] zmq = "0.8"
cargo build
to make sure we’re still good, then edit src/main.rs
:
extern crate zmq;
use std::error::Error;
fn server() -> Result<(), Box<Error>> {
let ctx = zmq::Context::new();
let socket = ctx.socket(zmq::REP)?;
socket.bind("ipc:///tmp/dose-response.ipc")?;
let receive_message = || {
socket.recv_bytes(0).map(|bytes| String::from_utf8(bytes))
};
let handshake = receive_message()??;
println!("{}", handshake);
socket.send_str("READY", 0)?;
Ok(())
}
fn main() {
if let Err(err) = server() {
println!("ERROR: {:?}", err);
}
}
cargo run --release
and try it against the Python client.
All we do for now is to complete the handshake. As you can see this is a fairly straightforward port of the Python server. There’s a little more error handling here, but that’s all.
For the full equivalent to the Python server we need to add the rand crate so put this in your Cargo.toml
under [dependencies]
:
rand = "0.3.15"
Now the full server:
extern crate rand;
extern crate zmq;
use std::error::Error;
use std::thread;
use std::time::Duration;
use rand::Rng;
fn server() -> Result<(), Box<Error>> {
let ctx = zmq::Context::new();
let socket = ctx.socket(zmq::REP)?;
socket.bind("ipc:///tmp/dose-response.ipc")?;
let receive_message = || {
socket.recv_bytes(0).map(|bytes| String::from_utf8(bytes))
};
let handshake = receive_message()??;
if handshake == "READY" {
socket.send_str("READY", 0)?;
loop {
let message = receive_message()??;
// Crash 10% of the time
if rand::thread_rng().next_f32() <= 0.1 {
panic!("Oh Noes!")
}
thread::sleep(Duration::from_millis(16));
socket.send_str("Hello World!", 0)?;
if message == "quit" {
break;
}
}
}
Ok(())
}
fn main() {
if let Err(err) = server() {
println!("ERROR: {:?}", err);
}
}
I was quite surprised how similar the servers are. The error handling complicates things a little (but actually much less than using unwrap
all over the place) and the bindings for both languages are virtually identical.
This does all I needed and it was quick and painless to get going and OMG I thought the Rust version was going to be much noisier!
As for ZeroMQ itself, this is my first foray into that area. I like that you can seamlessly switch from a single process to unix sockets to tcp.
The timeouts could have been easier to do but it’s like five more lines of code, so it’s not a huge bother. My understanding is that ZeroMQ just provides a very basic socket interface and you’re supposed to build whatever you need in terms of fault tolerance, high availability, etc. on top of those primitives. Fair enough.
I do wonder what do they do with all the consonants they saved, though.
PS: big shoutout to the ZeroMQ guide. It’s got examples in Python (in addition to C and a bunch of other languages) and it’s well written and readily skimmable!