In distributed computing, all-pairs communication is common where each machine needs to constantly send/receive data to/from every other machine. For example, message shuffling in MapReduce and Google's Pregel is an all-pairs communication. However, it is challenging to design an MPI-based all-pairs communication module that is able to fully utilize the network bandwidth, due to many unexpected behaviors of MPI.
We consider an example where each machine needs to constantly request data from other machines. We consider two solutions: (S1) all-to-all style synchronous message shuffling, and (S2) server-style message receiving, where a request/response server listens on its channel, and keeps processing each received request/response (from MPI_ANY_SOURCE).
For (S1) all-to-all style communication, we cannot use MPI_Alltoall/MPI_Alltoallv since (1) it does not scale with the number of processes somehow in both MPICH and Open MPI, and (2) they do not take a tag as input and so may interfere with other concurrent MPI communications (see the two MPI pitfalls mentioned here). We thus implement the determinstic synchronous all-to-all communication using MPI point-to-point communication primitives, and the code can be found here. There, as a simulation, we first let all processes (one on each machine) send a request of size BUF_SIZE to every other worker process using a synchronous all-to-all communication, and then let all workers send a response of size fold * BUF_SIZE back to every other worker process using a second synchronous all-to-all communication. The above two steps are repeated and the cumulative volume of received responses are periodically reported on the console, which reflects the communication throughput. Unfortunately, in a Gigabit Ethernet environment with 16 machines, we observe that this solution only delivers a throughput of around 100Mbps (i.e., 10% utilization rate), even though the communication volume is totally balanced.
Alternatively, for (S2) server-style communication, we maintain a request server and a response server. A server listens on its channel (using MPI_Iprobe) and processes each received message. However, since a request has size BUF_SIZE but a response has size fold * BUF_SIZE which is fold times larger, if we keep the requesting channel busy, the response channel will become congested (note that this will not happen for the all-to-all styple communication). We observe that if requests are kept being sent, the communication will become very unbalanced and the throughput drops to almost 0 between many pairs of machines, and gradually there is almost no progress in communication for all machines (not sure why, and may be relevant to the implementation of MPICH). However, when congestion does not happen, the throughput is many times higher than the all-to-all styple communication.
To ensure network throughput but avoid congestion, our solution is to let each machine sleep for a while after sending a batch of messages to every other machine and the code can be found here. Obviously, the throughput is inversely proportional to the sleep time. Interestingly, by varying the number of processes, we find that the network throughput increases linearly with the number of processes. Thus, when a program is run with many processes/machines, the estimated throughput will go beyond 1Gbps and congestion happens. In fact, while congestion does not happen in our private cluster when the throughput is above 100MB/s, we find that on Azure, it is safer to control the throughput to be around 60MB/s or congestion may happen. Also, this holds only when BUF_SIZE is 1kB (we set fold as 100 and so the volume of a batch of response messages is 100kB), and if the message batch becomes larger, congestion also happens. As a result, to avoid congestion, we try to keep the largest volume of a batch of messages to be around 100kB, and increase the sleep time proportionally to the number of processes to keep the throughput at around 60MB/s. This strategy has been integrated into our G-thinker system.