diff options
Diffstat (limited to 'perform/pio_engine.c')
-rw-r--r-- | perform/pio_engine.c | 327 |
1 files changed, 302 insertions, 25 deletions
diff --git a/perform/pio_engine.c b/perform/pio_engine.c index e35276b..0136a7f 100644 --- a/perform/pio_engine.c +++ b/perform/pio_engine.c @@ -133,12 +133,21 @@ typedef union _file_descr { static char *pio_create_filename(iotype iot, const char *base_name, char *fullname, size_t size); static herr_t do_write(file_descr *fd, iotype iot, long ndsets, - long nelmts, long buf_size, char *buffer); + long nelmts, long buf_size, void *buffer); +static herr_t do_read(file_descr *fd, iotype iot, long ndsets, + long nelmts, long buf_size, void *buffer /*out*/); static herr_t do_fopen(iotype iot, char *fname, file_descr *fd /*out*/, int flags); static herr_t do_fclose(iotype iot, file_descr *fd); static void do_cleanupfile(char *fname); +/* + * Function: do_pio + * Purpose: PIO Engine where Parallel IO are executed. + * Return: results + * Programmer: Albert Cheng, Bill Wendling 2001/12/12 + * Modifications: + */ results do_pio(parameters param) { @@ -228,7 +237,7 @@ do_pio(parameters param) GOTOERROR(FAIL); } -#if 1 +#if AKCDEBUG /* DEBUG*/ fprintf(stderr, "nfiles=%d\n", nfiles); fprintf(stderr, "ndsets=%ld\n", ndsets); @@ -236,8 +245,9 @@ fprintf(stderr, "nelmts=%ld\n", nelmts); fprintf(stderr, "niters=%d\n", niters); fprintf(stderr, "maxprocs=%d\n", maxprocs); fprintf(stderr, "buffer size=%ld\n", buf_size); +fprintf(stderr, "total data size=%ld\n", ndsets*nelmts*sizeof(int)); nfiles=MIN(3, nfiles); -ndsets=MIN(5, ndsets); +/*ndsets=MIN(5, ndsets);*/ /*nelmts=MIN(1000, nelmts);*/ buf_size=MIN(1024*1024, buf_size); /* DEBUG END */ @@ -282,7 +292,9 @@ buf_size=MIN(1024*1024, buf_size); sprintf(base_name, "#pio_tmp_%u", nf); pio_create_filename(iot, base_name, fname, sizeof(fname)); +#if AKCDEBUG fprintf(stderr, "filename=%s\n", fname); +#endif set_time(res.timers, HDF5_FILE_OPENCLOSE, START); hrc = do_fopen(iot, fname, &fd, PIO_CREATE | PIO_WRITE); @@ -310,11 +322,8 @@ fprintf(stderr, "filename=%s\n", fname); hrc = do_fopen(iot, fname, &fd, PIO_READ); VRFY((hrc == SUCCESS), "do_fopen failed"); - /* - hrc = do_read(&fd, iot, ndsets, nelmts, buffer); + hrc = do_read(&fd, iot, ndsets, nelmts, buf_size, buffer); VRFY((hrc == SUCCESS), "do_read failed"); - */ - /* Close file for read */ hrc = do_fclose(iot, &fd); @@ -473,12 +482,12 @@ pio_create_filename(iotype iot, const char *base_name, char *fullname, size_t si * Function: do_write * Purpose: Write the required amount of data to the file. * Return: SUCCESS or FAIL - * Programmer: Bill Wendling, 14. November 2001 + * Programmer: Albert Cheng, Bill Wendling, 2001/12/13 * Modifications: */ static herr_t do_write(file_descr *fd, iotype iot, long ndsets, - long nelmts, long buf_size, char *buffer) + long nelmts, long buf_size, void *buffer) { int ret_code = SUCCESS; int rc; /*routine return code */ @@ -502,18 +511,24 @@ do_write(file_descr *fd, iotype iot, long ndsets, hid_t h5mem_space_id = -1; /*memory dataspace ID */ hid_t h5ds_id = -1; /* dataset handle */ +#if AKCDEBUG +fprintf(stderr, "In do_write\n"); +fprintf(stderr, "ndsets=%ld\n", ndsets); +fprintf(stderr, "nelmts=%ld\n", nelmts); +fprintf(stderr, "buffer size=%ld\n", buf_size); +#endif /* calculate dataset parameters. data type is always native C int */ dset_size = nelmts * ELMT_SIZE; nelmts_in_buf = buf_size/ELMT_SIZE; - /* hdf5 dataset setup */ + /* hdf5 data space setup */ if (iot == PHDF5){ /* define a contiquous dataset of nelmts native ints */ h5dims[0] = nelmts; h5dset_space_id = H5Screate_simple(1, h5dims, NULL); VRFY((h5dset_space_id >= 0), "H5Screate_simple"); - /* create the memory dataspace */ + /* create the memory dataspace that corresponds to the xfer buffer */ h5dims[0] = nelmts_in_buf; h5mem_space_id = H5Screate_simple(1, h5dims, NULL); VRFY((h5mem_space_id >= 0), "H5Screate_simple"); @@ -549,18 +564,21 @@ do_write(file_descr *fd, iotype iot, long ndsets, * and the next process. Count of elements is the difference between * these two beginnings. This way, it avoids any rounding errors. */ - elmts_begin = (nelmts * 1.0)/pio_mpi_nprocs_g*pio_mpi_rank_g; + elmts_begin = (nelmts*1.0)/pio_mpi_nprocs_g*pio_mpi_rank_g; if (pio_mpi_rank_g < (pio_mpi_nprocs_g - 1)){ - elmts_count = ((nelmts * 1.0)/pio_mpi_nprocs_g*(pio_mpi_rank_g+1)) - elmts_begin; + elmts_count = ((nelmts*1.0)/pio_mpi_nprocs_g*(pio_mpi_rank_g+1)) - + elmts_begin; }else{ /* last process. Take whatever are left */ elmts_count = nelmts - elmts_begin; } + +#if AKCDEBUG fprintf(stderr, "proc %d: elmts_begin=%ld, elmts_count=%ld\n", pio_mpi_rank_g, elmts_begin, elmts_count); +#endif nelmts_written = 0 ; - while (nelmts_written < elmts_count){ nelmts_towrite = elmts_count - nelmts_written; @@ -570,8 +588,8 @@ fprintf(stderr, "proc %d: elmts_begin=%ld, elmts_count=%ld\n", /* last write of a partial buffer */ nelmts_towrite = elmts_count - nelmts_written; } - file_offset = dset_offset + (elmts_begin + nelmts_written)*ELMT_SIZE; +#if AKCDEBUG /*Prepare write data*/ { int *intptr = (int *)buffer; @@ -580,13 +598,18 @@ fprintf(stderr, "proc %d: elmts_begin=%ld, elmts_count=%ld\n", for (i = 0; i < nelmts_towrite; ++i) *intptr++ = nelmts_towrite + i; } +#endif /* Write */ -fprintf(stderr, "proc %d: writes %ld bytes at file-offset %ld\n", - pio_mpi_rank_g, nelmts_towrite*ELMT_SIZE, file_offset); /* Calculate offset of write within a dataset/file */ switch (iot){ case RAW: + file_offset = dset_offset + + (elmts_begin + nelmts_written)*ELMT_SIZE; +#if AKCDEBUG +fprintf(stderr, "proc %d: writes %ld bytes at file-offset %ld\n", + pio_mpi_rank_g, nelmts_towrite*ELMT_SIZE, file_offset); +#endif rc = RAWSEEK(fd->rawfd, file_offset); VRFY((rc>=0), "RAWSEEK"); rc = RAWWRITE(fd->rawfd, buffer, nelmts_towrite*ELMT_SIZE); @@ -594,12 +617,41 @@ fprintf(stderr, "proc %d: writes %ld bytes at file-offset %ld\n", break; case MPIO: - mpi_offset = file_offset; + mpi_offset = dset_offset + + (elmts_begin + nelmts_written)*ELMT_SIZE; +#if AKCDEBUG +fprintf(stderr, "proc %d: writes %ld bytes at mpi-offset %ld\n", + pio_mpi_rank_g, nelmts_towrite*ELMT_SIZE, mpi_offset); +#endif mrc = MPI_File_write_at(fd->mpifd, mpi_offset, buffer, nelmts_towrite*ELMT_SIZE, MPI_CHAR, &mpi_status); VRFY((mrc==MPI_SUCCESS), "MPIO_WRITE"); break; case PHDF5: + /*set up the dset space id to select the segment to process */ + { + hsize_t block[1], stride[1], count[1]; + hssize_t start[1]; + + start[0] = elmts_begin + nelmts_written; + stride[0] = block[0] = nelmts_towrite; + count[0] = 1; + hrc = H5Sselect_hyperslab(h5dset_space_id, H5S_SELECT_SET, + start, stride, count, block); + VRFY((hrc >= 0), "H5Sset_hyperslab"); + + /*setup the memory space id too. Only start is different */ + start[0] = 0; + hrc = H5Sselect_hyperslab(h5mem_space_id, H5S_SELECT_SET, + start, stride, count, block); + VRFY((hrc >= 0), "H5Sset_hyperslab"); + } + MPI_Barrier(pio_comm_g); + + /* set write time here */ + hrc = H5Dwrite(h5ds_id, H5T_NATIVE_INT, h5mem_space_id, + h5dset_space_id, H5P_DEFAULT, buffer); + VRFY((hrc >= 0), "H5Dwrite"); break; } @@ -646,11 +698,232 @@ done: return ret_code; } + +/* + * Function: do_read + * Purpose: read the required amount of data from the file. + * Return: SUCCESS or FAIL + * Programmer: Albert Cheng 2001/12/13 + * Modifications: + */ +static herr_t +do_read(file_descr *fd, iotype iot, long ndsets, + long nelmts, long buf_size, void *buffer /*out*/) +{ + int ret_code = SUCCESS; + int rc; /*routine return code */ + int mrc; /*MPI return code */ + MPI_Offset mpi_offset; + MPI_Status mpi_status; + long ndset; + long nelmts_toread, nelmts_read; + char dname[64]; + off_t dset_offset; /*dataset offset in a file */ + off_t file_offset; /*file offset of the next transfer */ + long dset_size; /*one dataset size in bytes */ + long nelmts_in_buf; + long elmts_begin; /*first elmt this process transfer */ + long elmts_count; /*number of elmts this process transfer */ + + /* HDF5 variables */ + herr_t hrc; /*HDF5 return code */ + hsize_t h5dims[1]; /*dataset dim sizes */ + hid_t h5dset_space_id = -1; /*dataset space ID */ + hid_t h5mem_space_id = -1; /*memory dataspace ID */ + hid_t h5ds_id = -1; /* dataset handle */ + +#if AKCDEBUG +fprintf(stderr, "In do_read\n"); +fprintf(stderr, "ndsets=%ld\n", ndsets); +fprintf(stderr, "nelmts=%ld\n", nelmts); +fprintf(stderr, "buffer size=%ld\n", buf_size); +#endif + /* calculate dataset parameters. data type is always native C int */ + dset_size = nelmts * ELMT_SIZE; + nelmts_in_buf = buf_size/ELMT_SIZE; + + /* hdf5 data space setup */ + if (iot == PHDF5){ + /* define a contiquous dataset of nelmts native ints */ + h5dims[0] = nelmts; + h5dset_space_id = H5Screate_simple(1, h5dims, NULL); + VRFY((h5dset_space_id >= 0), "H5Screate_simple"); + + /* create the memory dataspace that corresponds to the xfer buffer */ + h5dims[0] = nelmts_in_buf; + h5mem_space_id = H5Screate_simple(1, h5dims, NULL); + VRFY((h5mem_space_id >= 0), "H5Screate_simple"); + } + + for (ndset = 1; ndset <= ndsets; ++ndset) { + + /* Calculate dataset offset within a file */ + + /* create dataset */ + switch (iot) { + case RAW: + case MPIO: + /* both raw and mpi io just need dataset offset in file*/ + dset_offset = (ndset - 1) * dset_size; + break; + + case PHDF5: + sprintf(dname, "Dataset_%ld", ndset); + h5ds_id = H5Dopen(fd->h5fd, dname); + if (h5ds_id < 0) { + fprintf(stderr, "HDF5 Dataset open failed\n"); + GOTOERROR(FAIL); + } + + break; + } + + /* Calculate the first element and how many elements this process + * transfer. First calculate the beginning element of this process + * and the next process. Count of elements is the difference between + * these two beginnings. This way, it avoids any rounding errors. + */ + elmts_begin = (nelmts*1.0)/pio_mpi_nprocs_g*pio_mpi_rank_g; + if (pio_mpi_rank_g < (pio_mpi_nprocs_g - 1)){ + elmts_count = ((nelmts*1.0)/pio_mpi_nprocs_g*(pio_mpi_rank_g+1)) - + elmts_begin; + }else{ + /* last process. Take whatever are left */ + elmts_count = nelmts - elmts_begin; + } + +#if AKCDEBUG +fprintf(stderr, "proc %d: elmts_begin=%ld, elmts_count=%ld\n", + pio_mpi_rank_g, elmts_begin, elmts_count); +#endif + + nelmts_read = 0 ; + while (nelmts_read < elmts_count){ + nelmts_toread = elmts_count - nelmts_read; + + if (elmts_count - nelmts_read >= nelmts_in_buf) { + nelmts_toread = nelmts_in_buf; + } else { + /* last read of a partial buffer */ + nelmts_toread = elmts_count - nelmts_read; + } + + /* read */ + /* Calculate offset of read within a dataset/file */ + switch (iot){ + case RAW: + file_offset = dset_offset + + (elmts_begin + nelmts_read)*ELMT_SIZE; +#if AKCDEBUG +fprintf(stderr, "proc %d: read %ld bytes at file-offset %ld\n", + pio_mpi_rank_g, nelmts_toread*ELMT_SIZE, file_offset); +#endif + rc = RAWSEEK(fd->rawfd, file_offset); + VRFY((rc>=0), "RAWSEEK"); + rc = RAWREAD(fd->rawfd, buffer, nelmts_toread*ELMT_SIZE); + VRFY((rc==(nelmts_toread*ELMT_SIZE)), "RAWREAD"); + break; + + case MPIO: + mpi_offset = dset_offset + + (elmts_begin + nelmts_read)*ELMT_SIZE; +#if AKCDEBUG +fprintf(stderr, "proc %d: read %ld bytes at mpi-offset %ld\n", + pio_mpi_rank_g, nelmts_toread*ELMT_SIZE, mpi_offset); +#endif + mrc = MPI_File_read_at(fd->mpifd, mpi_offset, buffer, + nelmts_toread*ELMT_SIZE, MPI_CHAR, &mpi_status); + VRFY((mrc==MPI_SUCCESS), "MPIO_read"); + break; + case PHDF5: + /*set up the dset space id to select the segment to process */ + { + hsize_t block[1], stride[1], count[1]; + hssize_t start[1]; + + start[0] = elmts_begin + nelmts_read; + stride[0] = block[0] = nelmts_toread; + count[0] = 1; + hrc = H5Sselect_hyperslab(h5dset_space_id, H5S_SELECT_SET, + start, stride, count, block); + VRFY((hrc >= 0), "H5Sset_hyperslab"); + + /*setup the memory space id too. Only start is different */ + start[0] = 0; + hrc = H5Sselect_hyperslab(h5mem_space_id, H5S_SELECT_SET, + start, stride, count, block); + VRFY((hrc >= 0), "H5Sset_hyperslab"); + } + MPI_Barrier(pio_comm_g); + + /* set read time here */ + hrc = H5Dread(h5ds_id, H5T_NATIVE_INT, h5mem_space_id, + h5dset_space_id, H5P_DEFAULT, buffer); + VRFY((hrc >= 0), "H5Dread"); + break; + } + +#if AKCDEBUG & 0 + /*verify read data*/ + { + int *intptr = (int *)buffer; + register int i; + + for (i = 0; i < nelmts_towrite; ++i) + /* TO BE IMPLEMENTED */ + ; + } +#endif + + nelmts_read += nelmts_toread; + } + + /* Calculate read time */ + + /* Close dataset. Only HDF5 needs to do an explicit close. */ + if (iot == PHDF5){ + hrc = H5Dclose(h5ds_id); + + if (hrc < 0) { + fprintf(stderr, "HDF5 Dataset Close failed\n"); + GOTOERROR(FAIL); + } + + h5ds_id = -1; + } + } + +done: + /* release HDF5 objects */ + if (h5dset_space_id != -1) { + hrc = H5Sclose(h5dset_space_id); + if (hrc < 0){ + fprintf(stderr, "HDF5 Dataset Space Close failed\n"); + ret_code = FAIL; + } else { + h5dset_space_id = -1; + } + } + + if (h5mem_space_id != -1) { + hrc = H5Sclose(h5mem_space_id); + if (hrc < 0) { + fprintf(stderr, "HDF5 Memory Space Close failed\n"); + ret_code = FAIL; + } else { + h5mem_space_id = -1; + } + } + + return ret_code; +} + + /* * Function: do_fopen * Purpose: Open the specified file. * Return: SUCCESS or FAIL - * Programmer: Bill Wendling, 14. November 2001 + * Programmer: Albert Cheng, Bill Wendling, 2001/12/13 * Modifications: */ static herr_t @@ -737,7 +1010,7 @@ done: * Function: do_fclose * Purpose: Close the specified file descriptor. * Return: SUCCESS or FAIL - * Programmer: Bill Wendling, 14. November 2001 + * Programmer: Albert Cheng, Bill Wendling, 2001/12/13 * Modifications: */ static herr_t @@ -787,11 +1060,16 @@ done: /* - * Cleanup temporary file, fname unless HDF5_NOCLEANUP is set. - * Only Proc 0 of the PIO communicator will do the cleanup. Other processes - * just returns. + * Function: do_fclose + * Purpose: Cleanup temporary file unless HDF5_NOCLEANUP is set. + * Only Proc 0 of the PIO communicator will do the cleanup. + * Other processes just return. + * Return: void + * Programmer: Albert Cheng 2001/12/12 + * Modifications: */ -static void do_cleanupfile(char *fname) +static void +do_cleanupfile(char *fname) { if (pio_mpi_rank_g != 0) return; @@ -799,7 +1077,6 @@ static void do_cleanupfile(char *fname) if (clean_file_g == -1) clean_file_g = (getenv("HDF5_NOCLEANUP")==NULL) ? 1 : 0; -fprintf(stderr, "clean_file_g=%d\n", clean_file_g); if (clean_file_g) remove(fname); } |