And now for something completely different: a short tutorial on using ZeroMQ to distribute work between multiple processes.

It’s a not-uncommon problem in many areas of computing: you have a computationally expensive task that requires several attempts to get right (in my case it was rendering an offline slippy map of Western Australia, using the wonderful Mapnik, and it took many attempts to get the stylesheet just right). What to do? If you can split your job into discrete pieces of work then read on (and listen to this for some inspiration)!

Who is this for?

If you have a CPU-intensive job (for example rendering video or audio) that you need to run multiple times, perhaps with slightly varying parameters, this is for you. If you’re interested in ZeroMQ and want a working example that is a bit more sophisticated and complete than the examples from the ZeroMQ documentation this is for you. If you just like code snooping then this is for you too.

For the impatient, all the code discussed in this article is available here:https://bitbucket.org/hatchd/hatchd-wdd.

Isn’t this reinventing the wheel?

There is definitely an element of that. A good message broker (e.g. RabbitMQ) will do much of this, and a purpose-built application such as Celery will do much more, but there are still good reasons to do this yourself:

  • Control. If you have a very specific job it can be good to know exactly what’s going on, especially when it comes to debugging.
  • Flexibility. ZeroMQ is a more generic, lower-level way of achieving this outcome, which means you have to do a bit more yourself, but it means you CAN do more yourself.
  • Less components (in the ops sense). You don’t need more server applications running and more configuration to worry about.
  • Learning how to do stuff is fun! Get familiar with ZeroMQ and you’ll have a better understanding of how messaging systems are built.
  • And finally: you’ll be surprised by how little code this actually takes. It is not a great deal more than a similar solution using Celery.

A very quick intro to ZeroMQ

Although its name might lead you to believe it’s a message queue like RabbitMQ, this is not the case. ZeroMQ is not a server or a client; it is a library that provides networking functions with a higher level of abstraction than TCP sockets. Using it you can write your own servers and clients that communicate with each other, and even implement your own messaging system if you so desire.

TCP sockets are the building blocks for many kinds of networking, but they’re pretty basic at their core; you can read and write bytes, the bytes are guaranteed to be read out in the same order you wrote them (unlike UDP sockets), and unless you got an error the bytes are guaranteed to be delivered.

ZeroMQ provides “sockets” that have more intelligence. It has PUB/SUB sockets that allow multiple subscriber clients to connect to a publisher; it has REQ/REP sockets that allow for lockstep communication; it has ROUTER sockets that use a simple envelope scheme to handle routing; and more! The right combination of ZeroMQ sockets will allow you to build your own custom messaging topology, and if you want you can build it right into your existing application. There are bindings for all popular languages, and builds available for iOS and Android.

Requirements

Every project needs to have some documented requirements. Assuming that we have an arbitrary task that can be broken up into many “jobs”, here are our requirements:

  • A single controlling process that generates jobs and sends them out to workers.
  • One or more workers that perform the work required for the job and report the results back to the controller.

The approach

As a first cut we’ll try a quick and obvious approach: a Controller with a PUSH socket to send work out to many workers, and a PULL socket to receive the results from the worker. For the purposes of this tutorial our work will be performing a calculation on a list of numbers. PUSH sockets distribute messages to all connected clients in a round-robin fashion.

Here’s our Controller class. This will push out jobs to the workers and receive the results:

from multiprocessing import Process, Event
    import time
    import uuid
    import zmq


    class Controller(object):
        """Generate jobs, send the jobs out to workers and collect the results.
        """
        def __init__(self, stop_event):
            self.stop_event = stop_event
            self.context = zmq.Context()
            # Create a push socket to send out jobs to the workers
            self.socket = self.context.socket(zmq.PUSH)
            self.socket.bind('tcp://127.0.0.1:5755')
            # And a pull socket to accept the results of the work. In a very
            # simple case you could let the workers handle the results directly
            # (for example storing into a database) but this is a little
            # bit neater.
            self.socket_result = self.context.socket(zmq.PULL)
            self.socket_result.bind('tcp://127.0.0.1:5756')

        def work_iterator(self):
            """Return an iterator that yields work to be done.

            This iterator is super boring and yields successive ints using
            xrange().
            """
            return xrange(0, 10000)

        def run(self):
            for i in self.work_iterator():
                # For each job in our list of work send it out to a worker.
                # Messages sent using a push socket are round-robin load balanced
                # all connected workers.
                self.socket.send_json({'number': i})
                # Poll the incoming socket for results. poll() returns a Truthy
                # value if there are messages waiting.
                while self.socket_result.poll(0):
                    result = self.socket_result.recv_json()
                    print result['worker_id'], result['result']
                if self.stop_event.is_set():
                    break
            self.stop_event.set()

and our Worker class:

class Worker(object):
        """Accept work in the form of {'number': xxx}, square the number and
        send it back to the controller in the form
        {'result': xxx, 'worker_id': yyy}. Our "work" in this case is just
        squaring the contents of 'number'.
        """
        def __init__(self, stop_event):
            self.stop_event = stop_event
            self.context = zmq.Context()
            self.socket = self.context.socket(zmq.PULL)
            self.socket.connect('tcp://127.0.0.1:5755')
            self.socket_result = self.context.socket(zmq.PUSH)
            self.socket_result.connect('tcp://127.0.0.1:5756')
            # We'll send this id back with our results to make it easier
            # to verify that work is getting distributed among multiple
            # workers.
            self.my_id = uuid.uuid4().hex[:4]

        def run(self):
            while not self.stop_event.is_set():
                # Poll the socket for incoming messages. This will wait up to
                # 0.1 seconds before returning False. The other way to do this is
                # is to use zmq.NOBLOCK when reading from the socket, catching
                # zmq.AGAIN and sleeping for 0.1.
                while self.socket.poll(100):
                    work = self.socket.recv_json()
                    self.socket_result.send_json(
                        {'result': work['number'] ** 2, 'worker_id': self.my_id})

Note the use of Event from the multiprocessing module to communicate when the process should stop; this isn’t strictly necessary but we’ll be testing this using multiple processes launched from a single machine and it provides a nice way to tell everything to stop.

(The complete example is included as basic.py in the repo mentioned at the start of this post. If you’ve run buildout as per the instructions there’s also a run-basic executable in bin/ that will run it.)

So, what’s wrong with this? It definitely distributes work, the work is performed, and the results returned. If you’ve read the comments in the code above you may have guessed: the most significant problem is that PUSH sockets are round-robin load balanced. In our trivial example of squaring numbers this is ok, but if your jobs take a variable amount of time to run this is a big problem as you will most likely find some workers becoming heavily loaded while others idle.

A secondary problem is that ZeroMQ doesn’t queue messages in the way you might expect (at least if you’re comparing it to a message broker). There is a limit to the number of messages sitting in the outgoing queue of the PUSH socket (this number is controllable through the ZMQ_HWM setting) after which the PUSH socket will block (ZeroMQ sockets will either block or silently drop messages once the high water mark has been reached, depending on the socket type); this would appear to be the behaviour we want, but because there is a buffer on the PULL socket on the client as well the HWM behaviour will not be triggered, the controller will keep sending messages until there are none left (as long as there are workers available to receive). If a worker dies all the messages it has buffered are lost. We can set ZMQ_HWM on the PULL socket as well, but there are still buffers at the socket level. We could experiment with tuning ZMQ_SNDBUF and ZMQ_RCVBUF and possibly get a result. I have not attempted this, and I am somewhat skeptical that it would have the desired result. The documentation for ZMQ_SNDBUF and ZMQ_RCVBUF indicate that these options correspond to SO_SNDBUF and SO_RCVBUF on the underlying OS socket, which we really shouldn’t need to mess with (see socket for the gory details). In any case, the better solution is to move the load balancing logic into our application where we have some control over it.

For reference I have added to the repo mentioned at the start of the post a small example, hwm_test.py, that demonstrates our problem with ZMQ_HWM . You may also find the documentation for setsockopt useful.

A more sophisticated approach

Our first cut worked pretty well for a very simple case, but for real applications we need durability and reliability. Let’s add some new requirements:

  1. Work mustn’t be lost under any situation.
  2. All workers must be fully utilised, regardless of the variance in time taken for different jobs.
  3. Memory usage on the controller should remain fairly constant, regardless of how many items of work there are (this also means you need to be able to express your list of jobs as a generator, which can be challenging, but is very helpful if you can manage it).

The upshot of this is that we’re going to have to manage the load balancing ourselves; unfortunately this complicates our nice straightforward implementation by necessitating tracking which workers are connected and what jobs they have assigned to them, but we would have to do this anyway to fulfil our first requirement so it’s not as bad as it sounds.

Here’s a quick rundown of what we’re going to do to satisfy these new requirements:

  1. The controller needs to hand work out to specific workers based on how much work they already have, and once they’ve got a certain number of jobs don’t give them any more until the finish some. This means keeping track of which jobs have been assigned to which workers, and using a different type of socket, one that is able to address specific workers.
  2. Workers need to tell the controller when they connect. This means introducing control commands between the workers and the controller.
  3. Workers need to tell the controller when they disconnect. This will allow their work to be requeued, and no more work assigned.
  4. Our work items will be associated with a unique id, allowing us to track and requeue them when necessary. We’ll represent this with a new Job class and use UUIDs to track them.

Note that in a “real” implementation we would need to handle transient network conditions as well, not just polite disconnection messages. This generally means killing off clients that don’t respond in a certain amount of time (or something like a heartbeat check for each worker); I have omitted this as it adds a fair amount of boilerplate error handling and isn’t essential for this tutorial.

On the controller side we’ll switch out our PUSH socket for a ROUTER socket; ROUTER sockets have the following significant characteristics compared to PUSH sockets:

  • ROUTER sockets can both send and receive messages.
  • When messages are received they include a unique id representing the id of the sender (we can of course handle this ourselves, but it’s nice to have it done automatically).
  • When messages are sent a client id must be specified, and the message will be routed to this client only.

We’ll also change our worker to use a single DEALER socket and remove the PULL socket on the controller that we were using to receive results; if you wanted to send your results to some other place you could leave it, but for our purposes one socket on each end that can both send and receive will do nicely.

Aside: if you’ve spent some time with the ZeroMQ docs you might be wondering if you could use a pair of REQREP sockets for this. REQ/REP sockets allow both sending and receiving, but they enforce a much less flexible communication pattern so are not really suitable for our application.

First let’s modify our Controller’s constructor to use a ROUTER socket, and give it an instance variable to keep track of workers:

def __init__(self, control_port=CONTROL_PORT):
        self.context = zmq.Context()
        self.socket = context.socket(zmq.ROUTER)
        self.socket.bind('tcp://*:{0}'.format(port))
        # We'll keep our workers here, this will be keyed on the worker id,
        # and the value will be a dict of Job instances keyed on job id.
        self.workers = {}
        # We won't assign more than 50 jobs to a worker at a time; this ensures
        # reasonable memory usage, and less shuffling when a worker dies.
        self.max_jobs_per_worker = 50

Now, to track our workers and do some basic load balancing we’ll add a method that returns the id of the next available worker (the next step will be to handle worker registration and disconnection, but that’s a bit more involved so we’ll get this out of the way first):

def _get_next_worker_id(self):
        """Return the id of the next worker available to process work. Note
        that this will return None if no clients are available.
        """
        # It isn't strictly necessary since we're limiting the amount of work
        # we assign, but just to demonstrate that we could have any
        # algorithm here that we wanted we'll find the worker with the least
        # work and try that.
        worker_id, work = sorted(workers.items(), key=lambda x: len(x[1]))[0]
        if len(work) < self.max_jobs_per_worker:
            return worker_id
        # No worker is available. Our caller will have to handle this.
        return None

Here’s our new Job class. The Controller.workers attribute will contain collections of these keyed by worker id:

class Job(object):
        def __init__(self, work):
            self.id = uuid.uuid4().hex
            self.work = work

In this case ‘work’ will just be a dict containing an integer, as before. Here’s what our _work_iterator now looks like. Note that it yields Jobs, and that it will yield from Jobs that were requeued from failed clients:

def work_iterator(self):
        # iter() makes our xrange object into an iterator so we can use
        # next() on it.
        iterator = iter(xrange(0, 10000))
        while True:
            # Return requeued work first. We could simplify this method by
            # returning all new work then all requeued work, but this way we
            # know _work_to_requeue won't grow too large in the case of
            # many disconnects.
            if self._work_to_requeue:
                yield self._work_to_requeue.pop()
            else:
                yield Job({'number': next(iterator)})

Next up is a method to handle messages our workers send us. We’ll do just the bare minimum: connection, disconnection, work completion. Let’s define some basic messages, just using plain dicts for convenience. Once we’re done with the Controller we’ll teach our Worker how and when to send these:

def _handle_worker_message(self, worker_id, message):
        """Handle a message from the worker identified by worker_id.

        {'message': 'connect'}
        {'message': 'disconnect'}
        {'message': 'job_done', 'job_id': 'xxx', 'result': 'yyy'}
        """
        if message['message'] == 'connect':
            assert worker_id not in self.workers
            self.workers[worker_id] = {}
        elif message['message'] == 'disconnect':
            remaining_work = self.workers.pop(worker_id)
            # Remove the worker so no more work gets added, and put any
            # remaining work into _work_to_requeue
            self._work_to_requeue.extend(remaining_work.values())
        elif message['message'] == 'work_done':
            result = message['result']
            job = self.workers[worker_id].pop([message['job_id']])
            # _process_results() is just a trivial logging function so I've
            # omitted it from here, but you can find it in the final source
            # code.
            self._process_results(worker_id, job, result)

We’re on the home stretch now! All the pieces of our Controller are there, we just need to assemble them into the main loop:

def run(self):
        for job in self.work_iterator():
            next_worker_id = None
            while next_worker_id is None:
                # First check if there are any worker messages to process. We
                # do this while checking for the next available worker so that
                # if it takes a while to find one we're still processing
                # incoming messages.
                while self.socket_result.poll(0):
                    # Note that we're using recv_multipart() here, this is a
                    # special method on the ROUTER socket that includes the
                    # id of the sender. It doesn't handle the json decoding
                    # automatically though so we have to do that ourselves.
                    worker_id, message = self.socket_result.recv_multipart()
                    message = json.loads(message)
                    self._handle_worker_message(worker_id, message)
                # If there are no available workers (they all have 50 or
                # more jobs already) sleep for half a second.
                next_worker_id = self._get_next_worker_id()
                if next_worker_id is None:
                    time.sleep(0.5)
            # We've got a Job and an available worker_id, all we need to do
            # is send it. Note that we're now using send_multipart(), the
            # counterpart to recv_multipart(), to tell the ROUTER where our
            # message goes.
            self.socket.send_multipart(
                [next_worker_id, json.dumps((job.id, job.work))])
            if self.stop_event.is_set():
                break
        self.stop_event.set()

And that’s it. We’ve got a Controller that tracks the workers connected to it and load balances the job queue in a (somewhat) intelligent way. We’ve also got a bit of a bonus in that new workers can connect and start receiving work while the system is running; this is a very useful property, if your job starts taking too long you can just throw more resources at it, and if for whatever reason you need to kill a worker that’s fine too.

Now let’s take a look at our Worker class. Fortunately the Worker changes will be a bit easier to make as the Worker isn’t really doing too much differently. Here’s what we’re going to do:

  1. Change our Worker socket to a DEALER to make it aware of the client ids that the Controller needs.
  2. Update the messaging format and send some additional messages.

Here’s the relevant lines in the constructor to set up a DEALER socket and give it an identity:

self.socket = self.context.socket(zmq.DEALER)
    # We don't need to store the id anymore, the socket will handle it
    # all for us.
    self.socket.identity = uuid.uuid4().hex[:4]
    self.socket.connect('tcp://127.0.0.1:5755')

And our modified run loop:

try:
            # Send a connect message
            self.socket.send_json({'message': 'connect'})
            # Poll the socket for incoming messages. This will wait up to
            # 0.1 seconds before returning False. The other way to do this
            # is is to use zmq.NOBLOCK when reading from the socket,
            # catching zmq.AGAIN and sleeping for 0.1.
            while not self.stop_event.is_set():
                if self.socket.poll(100):
                    # Note that we can still use send_json()/recv_json() here,
                    # the DEALER socket ensures we don't have to deal with
                    # client ids at all.
                    job_id, work = self.socket.recv_json()
                    self.socket.send_json(
                        {'message': 'job_done',
                         'result': self._do_work(work),
                         'job_id': job_id})
        finally:
            self._disconnect()

Note that I’ve refactored the calculation into _do_work(); it’s the same as before with the exception that it now has a random sleep in it to give a better simulation of non-uniform workloads:

def _do_work(self, work):
        result = work['number'] ** 2
        time.sleep(random.randint(1, 10))
        return result

The only part we’re missing is the disconnect message. I have opted to only call this when the run loop exits (either due to the stop event being set or an exception), and as such made it a private method:

def _disconnect(self):
        """Send the Controller a disconnect message and end the run loop.
        """
        self.stop_event.set()
        self.socket.send_json({'message': 'disconnect'})

You can run a demo of all the parts of this by running controller.py. If you’ve used buildout this there’s shortcut in the bin/ directory:

$ bin/run-full

And that’s (mostly) all there is to it. Of course, in the real world you have to worry about provisioning servers to run your workers, transferring configuration, collating results, progress reporting and estimating time remaining, and many other fun things that I have conveniently ignored.

However, as a bonus for making it to the end (if you skipped to the end I will know) I’ve also included a ready to use component that you may find helpful for in building your own distributed application: an in-process file synchroniser. Check out wdd/filesync.py, it is used like so:

# In your controller:
    sync_server = filesync.Server('/path/to/your/files')
    threading.Thread(target=sync_server.run).start()

    # and in your Worker (this will block until complete):
    filesync.Client('/path/to/your/files', 'example.com').sync()

If you have, for example, a few gb of source images and an instruction file you can store these on the Controller’s server and on each run the Workers will synchronise the changes.

Easy peasy. I hope this tutorial has been educational, and given you some ideas for building your own custom networking applications using ZeroMQ. As mentioned previously, all code is available here.

Interested in working with us?

Whether you have a clearly defined product brief or you're not sure wherein the problem lies, drop us a line for a no-pressure chat about where you are at and how we might help.

A cute animal

You found !