diff options
-rw-r--r-- | perform/pio_engine.c | 125 |
1 files changed, 86 insertions, 39 deletions
diff --git a/perform/pio_engine.c b/perform/pio_engine.c index 9bc026a..e35276b 100644 --- a/perform/pio_engine.c +++ b/perform/pio_engine.c @@ -35,13 +35,13 @@ #define GOTOERROR(errcode) { ret_code = errcode; goto done; } #define GOTODONE { goto done; } #define ERRMSG(mesg) { \ - fprintf(stderr, "Proc %d: ", myrank); \ + fprintf(stderr, "Proc %d: ", pio_mpi_rank_g); \ fprintf(stderr, "*** Assertion failed (%s) at line %4d in %s\n", \ mesg, (int)__LINE__, __FILE__); \ } #define MSG(mesg) { \ - fprintf(stderr, "Proc %d: ", myrank); \ + fprintf(stderr, "Proc %d: ", pio_mpi_rank_g); \ fprintf(stderr, "(%s) at line %4d in %s\n", \ mesg, (int)__LINE__, __FILE__); \ } @@ -92,6 +92,17 @@ enum { PIO_READ = 4 }; +/* Global variables */ +MPI_Comm pio_comm_g; /* Communicator to run the PIO */ +int pio_mpi_rank_g; /* MPI rank of pio_comm_g */ +int pio_mpi_nprocs_g; /* number of processes of pio_comm_g */ + +static int clean_file_g = -1; /*whether to cleanup temporary test */ + /*files. -1 is not defined; */ + /*0 is no cleanup; 1 is do cleanup */ + + + /* * In a parallel machine, the filesystem suitable for compiling is * unlikely a parallel file system that is suitable for parallel I/O. @@ -122,10 +133,11 @@ typedef union _file_descr { static char *pio_create_filename(iotype iot, const char *base_name, char *fullname, size_t size); static herr_t do_write(file_descr *fd, iotype iot, long ndsets, - long nelmts, long buf_size, char *buffer, MPI_Comm comm); + long nelmts, long buf_size, char *buffer); static herr_t do_fopen(iotype iot, char *fname, file_descr *fd /*out*/, - int flags, MPI_Comm comm); + int flags); static herr_t do_fclose(iotype iot, file_descr *fd); +static void do_cleanupfile(char *fname); results do_pio(parameters param) @@ -151,9 +163,10 @@ do_pio(parameters param) herr_t hrc; /*HDF5 return code */ /* MPI variables */ - MPI_Comm comm = MPI_COMM_NULL; int myrank, nprocs = 1; + pio_comm_g = MPI_COMM_NULL; + /* Sanity check parameters */ /* IO type */ @@ -215,7 +228,7 @@ do_pio(parameters param) GOTOERROR(FAIL); } -#if 0 +#if 1 /* DEBUG*/ fprintf(stderr, "nfiles=%d\n", nfiles); fprintf(stderr, "ndsets=%ld\n", ndsets); @@ -225,14 +238,16 @@ fprintf(stderr, "maxprocs=%d\n", maxprocs); fprintf(stderr, "buffer size=%ld\n", buf_size); nfiles=MIN(3, nfiles); ndsets=MIN(5, ndsets); +/*nelmts=MIN(1000, nelmts);*/ +buf_size=MIN(1024*1024, buf_size); /* DEBUG END */ #endif - /* Create a sub communicator for this run. Easier to use the first N + /* Create a sub communicator for this PIO run. Easier to use the first N * processes. */ MPI_Comm_rank(MPI_COMM_WORLD, &myrank); color = (myrank < maxprocs); - mrc = MPI_Comm_split(MPI_COMM_WORLD, color, myrank, &comm); + mrc = MPI_Comm_split(MPI_COMM_WORLD, color, myrank, &pio_comm_g); if (mrc != MPI_SUCCESS) { fprintf(stderr, "MPI_Comm_split failed\n"); @@ -241,13 +256,13 @@ ndsets=MIN(5, ndsets); if (!color){ /* not involved in this run */ - mrc = MPI_Comm_free(&comm); + mrc = MPI_Comm_free(&pio_comm_g); GOTODONE; } - /* determine the mpi rank of in the new comm */ - MPI_Comm_size(comm, &nprocs); - MPI_Comm_rank(comm, &myrank); + /* determine the mpi rank of in the PIO communicator */ + MPI_Comm_size(pio_comm_g, &pio_mpi_nprocs_g); + MPI_Comm_rank(pio_comm_g, &pio_mpi_rank_g); /* allocate data buffer */ buffer = malloc(buf_size); @@ -270,13 +285,13 @@ ndsets=MIN(5, ndsets); fprintf(stderr, "filename=%s\n", fname); set_time(res.timers, HDF5_FILE_OPENCLOSE, START); - hrc = do_fopen(iot, fname, &fd, PIO_CREATE | PIO_WRITE, comm); + hrc = do_fopen(iot, fname, &fd, PIO_CREATE | PIO_WRITE); set_time(res.timers, HDF5_FILE_OPENCLOSE, STOP); VRFY((hrc == SUCCESS), "do_fopen failed"); set_time(res.timers, HDF5_WRITE_FIXED_DIMS, START); - hrc = do_write(&fd, iot, ndsets, nelmts, buf_size, buffer, comm); + hrc = do_write(&fd, iot, ndsets, nelmts, buf_size, buffer); set_time(res.timers, HDF5_WRITE_FIXED_DIMS, STOP); VRFY((hrc == SUCCESS), "do_write failed"); @@ -292,7 +307,7 @@ fprintf(stderr, "filename=%s\n", fname); * Read performance measurement */ /* Open file for read */ - hrc = do_fopen(iot, fname, &fd, PIO_READ, comm); + hrc = do_fopen(iot, fname, &fd, PIO_READ); VRFY((hrc == SUCCESS), "do_fopen failed"); /* @@ -304,7 +319,7 @@ fprintf(stderr, "filename=%s\n", fname); /* Close file for read */ hrc = do_fclose(iot, &fd); VRFY((hrc == SUCCESS), "do_fclose failed"); - remove(fname); + do_cleanupfile(fname); } done: @@ -329,8 +344,8 @@ done: } /* release MPI resources */ - if (comm != MPI_COMM_NULL){ - mrc = MPI_Comm_free(&comm); + if (pio_comm_g != MPI_COMM_NULL){ + mrc = MPI_Comm_free(&pio_comm_g); if (mrc != MPI_SUCCESS) { fprintf(stderr, "MPI_Comm_free failed\n"); ret_code = FAIL; @@ -463,7 +478,7 @@ pio_create_filename(iotype iot, const char *base_name, char *fullname, size_t si */ static herr_t do_write(file_descr *fd, iotype iot, long ndsets, - long nelmts, long buf_size, char *buffer, MPI_Comm comm) + long nelmts, long buf_size, char *buffer) { int ret_code = SUCCESS; int rc; /*routine return code */ @@ -474,9 +489,11 @@ do_write(file_descr *fd, iotype iot, long ndsets, long nelmts_towrite, nelmts_written; char dname[64]; off_t dset_offset; /*dataset offset in a file */ + off_t file_offset; /*file offset of the next transfer */ long dset_size; /*one dataset size in bytes */ long nelmts_in_buf; - int myrank, nprocs = 1; + long elmts_begin; /*first elmt this process transfer */ + long elmts_count; /*number of elmts this process transfer */ /* HDF5 variables */ herr_t hrc; /*HDF5 return code */ @@ -485,10 +502,6 @@ do_write(file_descr *fd, iotype iot, long ndsets, hid_t h5mem_space_id = -1; /*memory dataspace ID */ hid_t h5ds_id = -1; /* dataset handle */ - /* determine the mpi rank of in this comm */ - MPI_Comm_size(comm, &nprocs); - MPI_Comm_rank(comm, &myrank); - /* calculate dataset parameters. data type is always native C int */ dset_size = nelmts * ELMT_SIZE; nelmts_in_buf = buf_size/ELMT_SIZE; @@ -531,17 +544,33 @@ do_write(file_descr *fd, iotype iot, long ndsets, break; } + /* Calculate the first element and how many elements this process + * transfer. First calculate the beginning element of this process + * and the next process. Count of elements is the difference between + * these two beginnings. This way, it avoids any rounding errors. + */ + elmts_begin = (nelmts * 1.0)/pio_mpi_nprocs_g*pio_mpi_rank_g; + if (pio_mpi_rank_g < (pio_mpi_nprocs_g - 1)){ + elmts_count = ((nelmts * 1.0)/pio_mpi_nprocs_g*(pio_mpi_rank_g+1)) - elmts_begin; + }else{ + /* last process. Take whatever are left */ + elmts_count = nelmts - elmts_begin; + } +fprintf(stderr, "proc %d: elmts_begin=%ld, elmts_count=%ld\n", + pio_mpi_rank_g, elmts_begin, elmts_count); + nelmts_written = 0 ; - while (nelmts_written < nelmts){ - nelmts_towrite = nelmts - nelmts_written; + while (nelmts_written < elmts_count){ + nelmts_towrite = elmts_count - nelmts_written; - if (nelmts - nelmts_written >= nelmts_in_buf) { + if (elmts_count - nelmts_written >= nelmts_in_buf) { nelmts_towrite = nelmts_in_buf; } else { /* last write of a partial buffer */ - nelmts_towrite = nelmts - nelmts_written; + nelmts_towrite = elmts_count - nelmts_written; } + file_offset = dset_offset + (elmts_begin + nelmts_written)*ELMT_SIZE; /*Prepare write data*/ { @@ -553,18 +582,19 @@ do_write(file_descr *fd, iotype iot, long ndsets, } /* Write */ - +fprintf(stderr, "proc %d: writes %ld bytes at file-offset %ld\n", + pio_mpi_rank_g, nelmts_towrite*ELMT_SIZE, file_offset); /* Calculate offset of write within a dataset/file */ switch (iot){ case RAW: - rc = RAWSEEK(fd->rawfd, dset_offset + nelmts_written*ELMT_SIZE); + rc = RAWSEEK(fd->rawfd, file_offset); VRFY((rc>=0), "RAWSEEK"); rc = RAWWRITE(fd->rawfd, buffer, nelmts_towrite*ELMT_SIZE); VRFY((rc==(nelmts_towrite*ELMT_SIZE)), "RAWWRITE"); break; case MPIO: - mpi_offset = dset_offset; + mpi_offset = file_offset; mrc = MPI_File_write_at(fd->mpifd, mpi_offset, buffer, nelmts_towrite*ELMT_SIZE, MPI_CHAR, &mpi_status); VRFY((mrc==MPI_SUCCESS), "MPIO_WRITE"); @@ -624,8 +654,7 @@ done: * Modifications: */ static herr_t -do_fopen(iotype iot, char *fname, file_descr *fd /*out*/, int flags, - MPI_Comm comm) +do_fopen(iotype iot, char *fname, file_descr *fd /*out*/, int flags) { int ret_code = SUCCESS, mrc; herr_t hrc; @@ -633,7 +662,7 @@ do_fopen(iotype iot, char *fname, file_descr *fd /*out*/, int flags, switch (iot) { case RAW: - if ((flags | PIO_CREATE) || (flags | PIO_WRITE)) { + if (flags & (PIO_CREATE | PIO_WRITE)) { fd->rawfd = RAWCREATE(fname); } else { fd->rawfd = RAWOPEN(fname, O_RDONLY); @@ -647,11 +676,11 @@ do_fopen(iotype iot, char *fname, file_descr *fd /*out*/, int flags, break; case MPIO: - if ((flags | PIO_CREATE) || (flags | PIO_WRITE)) { - mrc = MPI_File_open(comm, fname, MPI_MODE_CREATE | MPI_MODE_RDWR, + if (flags & (PIO_CREATE | PIO_WRITE)) { + mrc = MPI_File_open(pio_comm_g, fname, MPI_MODE_CREATE | MPI_MODE_RDWR, MPI_INFO_NULL, &(fd->mpifd)); } else { - mrc = MPI_File_open(comm, fname, MPI_MODE_RDONLY, + mrc = MPI_File_open(pio_comm_g, fname, MPI_MODE_RDONLY, MPI_INFO_NULL, &(fd->mpifd)); } @@ -670,7 +699,7 @@ do_fopen(iotype iot, char *fname, file_descr *fd /*out*/, int flags, GOTOERROR(FAIL); } - hrc = H5Pset_fapl_mpio(acc_tpl, comm, MPI_INFO_NULL); + hrc = H5Pset_fapl_mpio(acc_tpl, pio_comm_g, MPI_INFO_NULL); if (hrc < 0) { fprintf(stderr, "HDF5 Property List Set failed\n"); @@ -678,7 +707,7 @@ do_fopen(iotype iot, char *fname, file_descr *fd /*out*/, int flags, } /* create the parallel file */ - if ((flags | PIO_CREATE) || (flags | PIO_WRITE)) { + if (flags & (PIO_CREATE | PIO_WRITE)) { fd->h5fd = H5Fcreate(fname, H5F_ACC_TRUNC, H5P_DEFAULT, acc_tpl); } else { fd->h5fd = H5Fopen(fname, H5P_DEFAULT, acc_tpl); @@ -756,4 +785,22 @@ done: return ret_code; } + +/* + * Cleanup temporary file, fname unless HDF5_NOCLEANUP is set. + * Only Proc 0 of the PIO communicator will do the cleanup. Other processes + * just returns. + */ +static void do_cleanupfile(char *fname) +{ + if (pio_mpi_rank_g != 0) + return; + + if (clean_file_g == -1) + clean_file_g = (getenv("HDF5_NOCLEANUP")==NULL) ? 1 : 0; + +fprintf(stderr, "clean_file_g=%d\n", clean_file_g); + if (clean_file_g) + remove(fname); +} #endif /* H5_HAVE_PARALLEL */ |