diff --git a/contrib/Makefile.in b/contrib/Makefile.in index 88d355fb1..8c72764d7 100644 --- a/contrib/Makefile.in +++ b/contrib/Makefile.in @@ -26,7 +26,7 @@ LDFLAGS += -m32 -Wl,-m32 -Wl,-melf_i386 -Wa,--32 endif plugins = -plugins_distclean = ckptfile infiniband +plugins_distclean = ckptfile infiniband mpi-proxy-split ifeq ($(INFINIBAND_SUPPORT),yes) plugins += infiniband # ib2tcp diff --git a/contrib/mpi-proxy-split/mpi-wrappers/.gitignore b/contrib/mpi-proxy-split/mpi-wrappers/.gitignore new file mode 100644 index 000000000..a21e0c6f5 --- /dev/null +++ b/contrib/mpi-proxy-split/mpi-wrappers/.gitignore @@ -0,0 +1,2 @@ +mana_p2p_update_logs +p2p-deterministic.h diff --git a/contrib/mpi-proxy-split/p2p_drain_send_recv.cpp b/contrib/mpi-proxy-split/p2p_drain_send_recv.cpp index 0a2f4a37a..aa455806b 100644 --- a/contrib/mpi-proxy-split/p2p_drain_send_recv.cpp +++ b/contrib/mpi-proxy-split/p2p_drain_send_recv.cpp @@ -97,8 +97,11 @@ recvMsgIntoInternalBuffer(MPI_Status status, MPI_Comm comm) MPI_Type_size(MPI_BYTE, &size); JASSERT(size == 1); void *buf = JALLOC_HELPER_MALLOC(count); - MPI_Recv(buf, count, MPI_BYTE, status.MPI_SOURCE, status.MPI_TAG, + int rc = MPI_Recv(buf, count, MPI_BYTE, status.MPI_SOURCE, status.MPI_TAG, comm, MPI_STATUS_IGNORE); + if (rc != MPI_SUCCESS) { + fprintf(stderr, "MPI_Recv failed\n"); + } mpi_message_t *message = (mpi_message_t *)JALLOC_HELPER_MALLOC(sizeof(mpi_message_t)); message->buf = buf; @@ -116,7 +119,7 @@ recvMsgIntoInternalBuffer(MPI_Status status, MPI_Comm comm) // Go through each MPI_Irecv in the g_async_calls map and try to complete // the MPI_Irecv before checkpointing. -int +static int completePendingIrecvs() { int bytesReceived = 0; @@ -135,6 +138,7 @@ completePendingIrecvs() call->comm); g_recvBytesByRank[worldRank] += call->count * size; bytesReceived += call->count * size; + JALLOC_HELPER_FREE(call); it = g_async_calls.erase(it); } else { /* We update the iterator even if the MPI_Test fails. @@ -162,7 +166,7 @@ completePendingIrecvs() return bytesReceived; } -int +static int recvFromAllComms() { int bytesReceived = 0; @@ -190,17 +194,16 @@ recvFromAllComms() return bytesReceived; } -void +static void removePendingSendRequests() { dmtcp::map::iterator it; for (it = g_async_calls.begin(); it != g_async_calls.end();) { MPI_Request request = it->first; mpi_async_call_t *call = it->second; - int flag = 0; if (call->type == ISEND_REQUEST) { UPDATE_REQUEST_MAP(request, MPI_REQUEST_NULL); - // FIXME: We should free `call' to avoid memory leak + JALLOC_HELPER_FREE(call); it = g_async_calls.erase(it); } else { it++; diff --git a/contrib/mpi-proxy-split/p2p_drain_send_recv.h b/contrib/mpi-proxy-split/p2p_drain_send_recv.h index 3a4b63f13..d4c160243 100644 --- a/contrib/mpi-proxy-split/p2p_drain_send_recv.h +++ b/contrib/mpi-proxy-split/p2p_drain_send_recv.h @@ -37,14 +37,12 @@ extern dmtcp::vector g_message_queue; void initialize_drain_send_recv(); void registerLocalSendsAndRecvs(); void drainSendRecv(); -int recvFromAllComms(int source); int recvMsgIntoInternalBuffer(MPI_Status status); bool isBufferedPacket(int source, int tag, MPI_Comm comm, int *flag, MPI_Status *status); int consumeBufferedPacket(void *buf, int count, MPI_Datatype datatype, int source, int tag, MPI_Comm comm, MPI_Status *mpi_status, int size); -void removePendingSendRequests(); void resetDrainCounters(); int localRankToGlobalRank(int localRank, MPI_Comm localComm); #endif diff --git a/contrib/mpi-proxy-split/p2p_log_replay.cpp b/contrib/mpi-proxy-split/p2p_log_replay.cpp index ac1c85502..014520744 100644 --- a/contrib/mpi-proxy-split/p2p_log_replay.cpp +++ b/contrib/mpi-proxy-split/p2p_log_replay.cpp @@ -128,7 +128,6 @@ logRequestInfo(MPI_Request request, mpi_req_t req_type) request_info_t* lookupRequestInfo(MPI_Request request) { - request_info_t *req_info; std::unordered_map::iterator it; it = request_log.find(request); if (it != request_log.end()) { diff --git a/contrib/mpi-proxy-split/split_process.cpp b/contrib/mpi-proxy-split/split_process.cpp index e31b07db3..7141ad3a6 100644 --- a/contrib/mpi-proxy-split/split_process.cpp +++ b/contrib/mpi-proxy-split/split_process.cpp @@ -247,12 +247,10 @@ read_lh_proxy_bits(pid_t childpid) // NOTE: This requires same privilege as ptrace_attach (valid for child). // Anecdotally, in containers, we've seen a case where this errors out // with ESRCH (no such proc.); it may need CAP_SYS_PTRACE privilege?? - for (int i = 0; i < IOV_SZ; i++) { - JTRACE("Reading segment from lh_proxy") - (remote_iov[i].iov_base)(remote_iov[i].iov_len); - ret = process_vm_readv(childpid, remote_iov + i, 1, remote_iov + i, 1, 0); - JASSERT(ret != -1)(JASSERT_ERRNO).Text("Error reading data from lh_proxy"); - } + JTRACE("Reading segment from lh_proxy") + (remote_iov[0].iov_base)(remote_iov[0].iov_len); + ret = process_vm_readv(childpid, remote_iov, IOV_SZ, remote_iov, IOV_SZ, 0); + JASSERT(ret != -1)(JASSERT_ERRNO).Text("Error reading data from lh_proxy"); // Can remove PROT_WRITE now that we've populated the text segment. ret = mprotect((void *)ROUND_DOWN(remote_iov[0].iov_base), @@ -333,7 +331,7 @@ startProxy() // Read from stdout of lh_proxy full lh_info struct, including orig memRange. close(pipefd_out[1]); // close write end of pipe // FIXME: should be a readall. Check for return error code. - if (read(pipefd_out[0], &lh_info, sizeof lh_info) < sizeof lh_info) { + if (read(pipefd_out[0], &lh_info, sizeof lh_info) < (ssize_t) sizeof lh_info) { JWARNING(false)(JASSERT_ERRNO) .Text("Read fewer bytes than expected"); break; } @@ -352,7 +350,6 @@ findLHMemRange(MemRange_t *lh_mem_range) bool is_set = false; Area area; - char prev_path_name[PATH_MAX]; char next_path_name[PATH_MAX]; uint64_t prev_addr_end; uint64_t next_addr_start;