ØMQ is a messaging system, or "message-oriented middleware" if you will. It is used in environments as diverse as financial services, game development, embedded systems, academic research, and aerospace.
Messaging systems work basically as instant messaging for applications. An application decides to communicate an event to another application (or multiple applications), it assembles the data to be sent, hits the "send" button, and the messaging system takes care of the rest. Unlike instant messaging, though, messaging systems have no GUI and assume no human beings at the endpoints capable of intelligent intervention when something goes wrong. Messaging systems thus have to be both fault-tolerant and much faster than common instant messaging.
ØMQ was originally conceived as an ultra-fast messaging system for stock trading and so the focus was on extreme optimization. The first year of the project was spent devising benchmarking methodology and trying to define an architecture that was as efficient as possible.
Later on, approximately in the second year of development, the focus shifted to providing a generic system for building distributed applications and supporting arbitrary messaging patterns, various transport mechanisms, arbitrary language bindings, etc.
During the third year, the focus was mainly on improving usability and flattening the learning curve. We adopted the BSD Sockets API, tried to clean up the semantics of individual messaging patterns, and so on.
This article will give insight into how the three goals above translated into the internal architecture of ØMQ, and provide some tips for those who are struggling with the same problems.
Since its third year, ØMQ has outgrown its codebase; there is an initiative to standardize the wire protocols it uses, and an experimental implementation of a ØMQ-like messaging system inside the Linux kernel, etc. These topics are not covered here. However, you can check online resources for further details.
Application vs. Library
ØMQ is a library, not a messaging server. It took us several years working on the AMQP protocol, a financial industry attempt to standardize the wire protocol for business messaging writing a reference implementation for it and participating in several large-scale projects heavily based on messaging technology to realize that there's something wrong with the classic client/server model of a smart messaging server (broker) and dumb messaging clients.
Our primary concern was with the performance: If there's a server in the middle, each message has to pass the network twice (from the sender to the broker and from the broker to the receiver) inducing a penalty in terms of both latency and throughput. Moreover, if all the messages are passed through the broker, at some point, the server is bound to become the bottleneck.
A secondary concern was related to large-scale deployments: when the deployment crosses organizational boundaries the concept of a central authority managing the whole message flow doesn't apply anymore. No company is willing to cede control to a server in a different company due to trade secrets and legal liability. The result in practice is that there's one messaging server per company, with hand-written bridges to connect it to messaging systems in other companies. The whole ecosystem is thus heavily fragmented, and maintaining a large number of bridges for every company involved doesn't make the situation better. To solve this problem, we need a fully distributed architecture, an architecture where every component can be possibly governed by a different business entity. Given that the unit of management in server-based architecture is the server, we can solve the problem by installing a separate server for each component. In such a case we can further optimize the design by making the server and the component share the same processes. What we end up with is a messaging library.
ØMQ was started when we got an idea about how to make messaging work without a central server. It required turning the whole concept of messaging upside down and replacing the model of an autonomous centralized store of messages in the center of the network with a "smart endpoint, dumb network" architecture based on the end-to-end principle. The technical consequence of that decision was that ØMQ, from the very beginning, was a library, not an application.
We've been able to prove that this architecture is both more efficient (lower latency, higher throughput) and more flexible (it's easy to build arbitrary complex topologies instead of being tied to classic hub-and-spoke model) than standard approaches.
One of the unintended consequences was that opting for the library model improved the usability of the product. Over and over again users express their happiness about the fact that they don't have to install and manage a stand-alone messaging server. It turns out that not having a server is a preferred option as it cuts operational cost (no need to have a messaging server admin) and improves time-to-market (no need to negotiate the need to run the server with the client, the management or the operations team).
The lesson learned is that when starting a new project, you should opt for the library design if at all possible. It's pretty easy to create an application from a library by invoking it from a trivial program; however, it's almost impossible to create a library from an existing executable. A library offers much more flexibility to the users, at the same time sparing them non-trivial administrative effort.
Global variables don't play well with libraries. A library may be loaded several times in the process but even then there's only a single set of global variables. Figure 1 shows a ØMQ library being used from two different and independent libraries. The application then uses both of those libraries.
Figure 1: ØMQ being used by different libraries.
When such a situation occurs, both instances of ØMQ access the same variables, resulting in race conditions, strange failures and undefined behavior. To prevent this problem, the ØMQ library has no global variables. Instead, a user of the library is responsible for creating the global state explicitly. The object containing the global state is called context. While from the user's perspective context looks more or less like a pool of worker threads, from ØMQ's perspective it's just an object to store any global state that we happen to need. In the picture above, libA would have its own context and libB would have its own as well. There would be no way for one of them to break or subvert the other one.
The lesson here is pretty obvious: Don't use global state in libraries. If you do, the library is likely to break when it happens to be instantiated twice in the same process.
When ØMQ was started, its primary goal was to optimize performance. Performance of messaging systems is expressed using two metrics: throughput how many messages can be passed during a given amount of time; and latency how long it takes for a message to get from one endpoint to the other.
Which metric should we focus on? What's the relationship between the two? Isn't it obvious? Run the test, divide the overall time of the test by number of messages passed and what you get is latency. Divide the number of messages by time and what you get is throughput. In other words, latency is the inverse value of throughput. Trivial, right?
Instead of starting coding straight away we spent some weeks investigating the performance metrics in detail and we found out that the relationship between throughput and latency is much more subtle than that, and often the metrics are quite counter-intuitive.
Imagine A sending messages to B (see Figure 2). The overall time of the test is 6 seconds. There are 5 messages passed. Therefore, the throughput is 0.83 messages/sec (5/6) and the latency is 1.2 sec (6/5), right?
Figure 2: Sending messages from A to B.
Have a look at the diagram again. It takes a different time for each message to get from A to B: 2 sec, 2.5 sec, 3 sec, 3.5 sec, 4 sec. The average is 3 seconds, which is pretty far away from our original calculation of 1.2 second. This example shows the misconceptions people are intuitively inclined to make about performance metrics.
Now have a look at the throughput. The overall time of the test is 6 seconds. However, at A it takes just 2 seconds to send all the messages. From A's perspective the throughput is 2.5 msgs/sec (5/2). At B it takes 4 seconds to receive all messages. So from B's perspective, the throughput is 1.25 msgs/sec (5/4). Neither of these numbers matches our original calculation of 1.2 msgs/sec.
To make a long story short, latency and throughput are two different metrics; that much is obvious. The important thing is to understand the difference between the two and their relationship. Latency can be measured only between two different points in the system; there is no such thing as latency at point A. Each message has its own latency. You can average the latencies of multiple messages; however, there's no such thing as latency of a stream of messages.
Throughput, on the other hand, can be measured only at a single point of the system. There's a throughput at the sender, there's a throughput at the receiver, there's a throughput at any intermediate point between the two, but there's no such thing as overall throughput of the whole system. And throughput make sense only for a set of messages; there's no such thing as throughput of a single message.
As for the relationship between throughput and latency, it turns out there really is a relationship; however, the formula involves integrals and we won't discuss it here. For more information, read the literature on queuing theory. There are many more pitfalls in benchmarking the messaging systems that we won't go further into. The stress should rather be placed on the lesson learned: Make sure you understand the problem you are solving. Even a problem as simple as "make it fast" can take lot of work to understand properly. What's more, if you don't understand the problem, you are likely to build implicit assumptions and popular myths into your code, making the solution either flawed or at least much more complex or much less useful than it could possibly be.
We discovered during the optimization process that three factors have a crucial impact on performance:
- Number of memory allocations
- Number of system calls
- Concurrency model
However, not every memory allocation or every system call has the same effect on performance. The performance we are interested in messaging systems is the number of messages we can transfer between two endpoints during a given amount of time. Alternatively, we may be interested in how long it takes for a message to get from one endpoint to another.
However, given that ØMQ is designed for scenarios with long-lived connections, the time it takes to establish a connection or the time needed to handle a connection error is basically irrelevant. These events happen very rarely and so their impact on overall performance is negligible.
The part of a codebase that gets used very frequently, over and over again, is called the critical path; optimization should focus on the critical path.
Let's have a look at an example: ØMQ is not extremely optimized with respect to memory allocations. For example, when manipulating strings, it often allocates a new string for each intermediate phase of the transformation. However, if we look strictly at the critical path the actual message passing we'll find out that it uses almost no memory allocations. If messages are small, it's just one memory allocation per 256 messages (these messages are held in a single large allocated memory chunk). If, in addition, the stream of messages is steady, without huge traffic peaks, the number of memory allocations on the critical path drops to zero (the allocated memory chunks are not returned to the system, but reused repeatedly).
Lesson learned: Optimize where it makes difference. Optimizing pieces of code that are not on the critical path is wasted effort.