Memory Leak issue in ParallelComm Class (Resolved)
Memory Leak in ParallelComm Class
Recently, an user raised a memory leak issue of ParallelComm class on exchanging tags, see exchange-tags-memory-leak.
Original Example Test
In original example test (see attached file mpitest1.cpp), the memory usage keeps increasing when exchange tags is called in a loop. This test depends on MOAB (the input mesh file is MeshFiles/unittest/64bricks_1khex.h5m) so we need run it in unit test folder or example folder (we can temporarily overwrite an existing unit test or example). With this test, we have confirmed that:
The memory leak only occurs when a long message (the size is larger than INITIAL_BUFF_SIZE) has to be broken into two parts to be sent out (send 2nd part after ack is received). If we increase INITIAL_BUFF_SIZE, e.g. from 1,024 to 16,384, or use a smaller mesh file for testing, there is no leak. In either case, the message size is always less than INITIAL_BUFF_SIZE.
We were still unclear about the reason:
1) Could be caused by possible bugs of the Buffer class which uses malloc, free and memcpy
2) Could be caused by possible bugs of pack/unpack
3) Could be caused by misuse of MPI calls
We extracted some code from ParallelComm class and designed a simplified test (see attached file mpitest2.cpp) for easier debugging, which does not depend on MOAB. We no longer have to look deep into pack/unpack code, and we can configure the message size freely to test possible scenarios.
This test runs on two processors only, and the memory leak issue can still be reproduced:
1) If both processors send out long messages, they both have memory leak
2) If both processors send out short messages, they both have no leak
3) If processor 0 sends out long messages and processor 1 sends out short messages, only processor 0 has memory leak
Since we did not find apparent bugs in the Buffer class, and the memory leak can be reproduced without pack/unpack, we tend to think about possible misuse of MPI calls. In a simple example we have tested, there is a loop which contains only MPI_Irecv (there is no corresponding MPI_Isend). As a result, memory leak is observed after several iterations as the memory allocated for MPI_recv is not freed at the end of each iteration.
A Pitfall of Shared MPI Requests
We reviewed the simplified test (about 300 lines of code) carefully and eventually noticed the existence of shared MPI requests.
Assume that the two processors progress with similar speed, an iteration on each processor would sequentially do the following:
1) Post MPI_Irecv on 1st part of the message, with recv_req0
2) Post MPI_Irecv on ack, with recv_req1
3) Post MPI_Isend on 1st part of the message, with send_req0
4) Call MPI_Wait_any on recv_req0 and recv_req1
After wait, it should be recv_req0 finished and 1st part of the message is received (ack has not been sent out yet by the other processor)
5) Post MPI_Irecv on 2nd part of the message, with rev_req0, the same request object used by 1)
6) Post MPI_Isend on ack, with send_req1
7) Call MPI_Wait_any again on recv_req0 and recv_req1
After wait, it should be recv_req1 finished and ack is received (2nd part has not been sent out yet by the other processor)
8) Post MPI_Isend on 2nd part of the message, with send_req0, the same request object used by 3)
9) Call Wait_any the 3rd time on recv_req0 and recv_req1
After wait, it should be recv_req0 finished and 2nd part of the message is received
10) Call MPI_Wait_all on send_req0 and send_req1
So in 1) and 5), the same MPI receive request object is shared, while in 3) and 8), the same MPI send request object is shared.
Will the shared MPI request objects cause any issues? For 5), it might be OK to reuse recv_req0, since 1) has finished after the MPI_Wait_any in 4). For 8), there is no MPI_Wait on send_req0 used by 3) before it is reused, this could be an issue for 10). After MPI_Wait_all in 10) returns, it is possible that only 3) has finished, while 8) has not finished yet. When the execution goes to the next iteration, memory allocated by MPI for MPI_Isend in 8) will be leaked.
For send requests, we suggest no sharing (unless we explicitly use MPI_Wait on send_req0 before 8), which is not preferred). For receive requests, sharing seems to work, but this assumes that the MPI_Wait_any in 4) returns because recv_req0 has finished (instead of recv_req1), which has not been proved yet.
What Is the Current Fix?
A safe fix is to simply avoid using shared MPI request objects, at the cost of using slightly more (but negligible) memory. Different send/receive request objects will be used for 1st part of the message, 2nd part of the message, and ack, respectively.
Right now, for every pair of processors that communicate, in exchange_tags, we keep 3 receive requests, and 3 send requests.
These requests are kept in a vector of size 3 * num_procs.
Assume that processor A communicates with processor B, and processor A has index indA in the list of processors that communicate with B, and processor B has index indB, in the list of processors that communicate with A.
buffProcs is a member data std::vector<int> that shows the processor number every processor communicates with
So, on processor A, buffProcs[indB] = B, and on processor B, buffProcs[indA] = A
Every processor communicates with buffProcs.size() other processors, and for each of those, there is a buffer to send to, and a buffer to receive from:
std::vector<Buffer*> localOwnedBuffs, remoteOwnedBuffs;
buffProcs.size()== localOwnedBuffs.size() == remoteOwnedBuffs.size()
So, to send from A to B,
we use on task A: buffProcs[indB] = B, localOwnedBuff[indB] will be used to send to B
On task B, we have buffProcs[indA]=A, and remoteOwnedBuff[indA] will receive data from A
We keep 2 lists of requests, one for sending, one for receiving, each of size 3*buffProcs.size().
As a reminder, when we wait for a request, with WaitAny, when we return from that method, the index of the completed request
is returned. We use WaitAny for the receive requests, and wailAll for the sending requests
One list of requests is member data (sendReqs.size() == 3*buffProcs.size() )
The receive requests are local data within exchange_tags method:
std::vector<MPI_Request> recv_tag_reqs(3*buffProcs.size(), MPI_REQUEST_NULL);
(There is no benefit to make sendReqs member data, as those should be local to each communication block)
Each communication phase should comprise of a set of async communication, nonblocking; Each phase is collective; a phase needs to be finished on all processes
We have phases for:
1) resolve shared ents (other than vertices)
2) exchange ghosts: each layer has one phase
3) exchange tags
4) reduce tags
5) settle communication points (part of intersection code, mbcslam; the strategy is the same as for exchange tags)
for communication between A and B, we use, on A, total of 6 requests:
sendReqs[3*indB] for sending the first message from A to B, fixed size
sendReqs[3*indB+1] for sending the second message if needed, variable size
sendReqs[3*indB+2] for sending the ack (this will be needed if B needs to sends a large message to A)
recv_tag_reqs[3*indB] for receiving the first message from B, fixed size
recv_tag_reqs[3*indB+1] for receiving the second message from B, if needed, variable size
recv_tag_reqs[3*indB+2] for receiving the ack (this will be needed if A needs to sends a large message to B)
incoming is a local variable, starts at 0, it shows the number of messages to receive on each local processor (line 7047)
Sending a message from A to B involves this:
1) B posts an irecv from A: (line 7054)
MPI_Irecv(remoteOwnedBuffs[indA]->mem_ptr, INITIAL_BUFF_SIZE, MPI_UNSIGNED_CHAR, A, tag7, COMM_W, &recv_tag_reqs[3*indA] );
incomingB is incremented, because we expect one more message
2) A uses “send_buffer” method to send to B the message packed (line 7117)
result = send_buffer(B, localOwnedBuffs[indB], tag7, sendReqs[3*indB], recv_tag_reqs[3*indB+2], &dum_ack_buff, incomingA);
we pass incoming by reference, because incoming could be modified in this method
in send_buffer method (defined at line 5579), we do this:
a) if size > initial, we will post a second receive, with the req for ack
MPI_Irecv(dummy_ack_buff, 4, MPI_UNSIGNED_CHAR, B, tag6, COMM_W, &recv_tag_reqs[3*indB+2] );
b) do the send of the first part of the message
So at this point, after send_buffer is completed, only the first part of the message is sent, not the whole message
3) B is now waiting for message(s) from A:
while (incomingB) do_something; This could be executed 3 times, for A
B is waiting on any request from recv_tag_reqs vector, for completion
it receives anything from A at the index 3*indA, 3*indA+1, 3*indA+2
it calls then recv_buffer() method at line 7143
recv_buffer(tag7, status, remoteOwnedBuffs[indA],
recv_tag_reqs[3*indA + 1], // this is for receiving the second message
recv_tag_reqs[3*indA + 2], // this would be for ack, but it is not used; consider removing it
sendReqs[3*indA+1], // send request for sending the second message
sendReqs[3*indA+2], // this is for sending the ack
What does recv_buffer do? (line 5638)
a) if the message has tag 7 , it is the first message received from A;
I) if the buffer to receive needs to be enlarged,
a1) post a second receive with the req for the second message,
a2) then do an isend with the ack, back to A
so A knows it can send the second part of the message
II) if the buffer is fine, we are done, signal done
b) if the message has tag6, it is an ack from A (not case that we study; it should send the second part of message to A)
posts an isend for second part of message to A
success = MPI_Isend(localOwnedBuffs[indA]->mem_ptr+INITIAL_BUFF_SIZE, localOwnedBuffs[indA]->get_stored_size() – INITIAL_BUFF_SIZE,
MPI_UNSIGNED_CHAR, from_proc, mesg_tag_expected+1,
In the end, we wait for all sending requests posted, with WaitAll()
To recapitulate: receiving requests are processed after WaitAny, in any order; sending requests are just waited all