summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--perform/pio_engine.c125
1 files changed, 86 insertions, 39 deletions
diff --git a/perform/pio_engine.c b/perform/pio_engine.c
index 9bc026a..e35276b 100644
--- a/perform/pio_engine.c
+++ b/perform/pio_engine.c
@@ -35,13 +35,13 @@
#define GOTOERROR(errcode) { ret_code = errcode; goto done; }
#define GOTODONE { goto done; }
#define ERRMSG(mesg) { \
- fprintf(stderr, "Proc %d: ", myrank); \
+ fprintf(stderr, "Proc %d: ", pio_mpi_rank_g); \
fprintf(stderr, "*** Assertion failed (%s) at line %4d in %s\n", \
mesg, (int)__LINE__, __FILE__); \
}
#define MSG(mesg) { \
- fprintf(stderr, "Proc %d: ", myrank); \
+ fprintf(stderr, "Proc %d: ", pio_mpi_rank_g); \
fprintf(stderr, "(%s) at line %4d in %s\n", \
mesg, (int)__LINE__, __FILE__); \
}
@@ -92,6 +92,17 @@ enum {
PIO_READ = 4
};
+/* Global variables */
+MPI_Comm pio_comm_g; /* Communicator to run the PIO */
+int pio_mpi_rank_g; /* MPI rank of pio_comm_g */
+int pio_mpi_nprocs_g; /* number of processes of pio_comm_g */
+
+static int clean_file_g = -1; /*whether to cleanup temporary test */
+ /*files. -1 is not defined; */
+ /*0 is no cleanup; 1 is do cleanup */
+
+
+
/*
* In a parallel machine, the filesystem suitable for compiling is
* unlikely a parallel file system that is suitable for parallel I/O.
@@ -122,10 +133,11 @@ typedef union _file_descr {
static char *pio_create_filename(iotype iot, const char *base_name,
char *fullname, size_t size);
static herr_t do_write(file_descr *fd, iotype iot, long ndsets,
- long nelmts, long buf_size, char *buffer, MPI_Comm comm);
+ long nelmts, long buf_size, char *buffer);
static herr_t do_fopen(iotype iot, char *fname, file_descr *fd /*out*/,
- int flags, MPI_Comm comm);
+ int flags);
static herr_t do_fclose(iotype iot, file_descr *fd);
+static void do_cleanupfile(char *fname);
results
do_pio(parameters param)
@@ -151,9 +163,10 @@ do_pio(parameters param)
herr_t hrc; /*HDF5 return code */
/* MPI variables */
- MPI_Comm comm = MPI_COMM_NULL;
int myrank, nprocs = 1;
+ pio_comm_g = MPI_COMM_NULL;
+
/* Sanity check parameters */
/* IO type */
@@ -215,7 +228,7 @@ do_pio(parameters param)
GOTOERROR(FAIL);
}
-#if 0
+#if 1
/* DEBUG*/
fprintf(stderr, "nfiles=%d\n", nfiles);
fprintf(stderr, "ndsets=%ld\n", ndsets);
@@ -225,14 +238,16 @@ fprintf(stderr, "maxprocs=%d\n", maxprocs);
fprintf(stderr, "buffer size=%ld\n", buf_size);
nfiles=MIN(3, nfiles);
ndsets=MIN(5, ndsets);
+/*nelmts=MIN(1000, nelmts);*/
+buf_size=MIN(1024*1024, buf_size);
/* DEBUG END */
#endif
- /* Create a sub communicator for this run. Easier to use the first N
+ /* Create a sub communicator for this PIO run. Easier to use the first N
* processes. */
MPI_Comm_rank(MPI_COMM_WORLD, &myrank);
color = (myrank < maxprocs);
- mrc = MPI_Comm_split(MPI_COMM_WORLD, color, myrank, &comm);
+ mrc = MPI_Comm_split(MPI_COMM_WORLD, color, myrank, &pio_comm_g);
if (mrc != MPI_SUCCESS) {
fprintf(stderr, "MPI_Comm_split failed\n");
@@ -241,13 +256,13 @@ ndsets=MIN(5, ndsets);
if (!color){
/* not involved in this run */
- mrc = MPI_Comm_free(&comm);
+ mrc = MPI_Comm_free(&pio_comm_g);
GOTODONE;
}
- /* determine the mpi rank of in the new comm */
- MPI_Comm_size(comm, &nprocs);
- MPI_Comm_rank(comm, &myrank);
+ /* determine the mpi rank of in the PIO communicator */
+ MPI_Comm_size(pio_comm_g, &pio_mpi_nprocs_g);
+ MPI_Comm_rank(pio_comm_g, &pio_mpi_rank_g);
/* allocate data buffer */
buffer = malloc(buf_size);
@@ -270,13 +285,13 @@ ndsets=MIN(5, ndsets);
fprintf(stderr, "filename=%s\n", fname);
set_time(res.timers, HDF5_FILE_OPENCLOSE, START);
- hrc = do_fopen(iot, fname, &fd, PIO_CREATE | PIO_WRITE, comm);
+ hrc = do_fopen(iot, fname, &fd, PIO_CREATE | PIO_WRITE);
set_time(res.timers, HDF5_FILE_OPENCLOSE, STOP);
VRFY((hrc == SUCCESS), "do_fopen failed");
set_time(res.timers, HDF5_WRITE_FIXED_DIMS, START);
- hrc = do_write(&fd, iot, ndsets, nelmts, buf_size, buffer, comm);
+ hrc = do_write(&fd, iot, ndsets, nelmts, buf_size, buffer);
set_time(res.timers, HDF5_WRITE_FIXED_DIMS, STOP);
VRFY((hrc == SUCCESS), "do_write failed");
@@ -292,7 +307,7 @@ fprintf(stderr, "filename=%s\n", fname);
* Read performance measurement
*/
/* Open file for read */
- hrc = do_fopen(iot, fname, &fd, PIO_READ, comm);
+ hrc = do_fopen(iot, fname, &fd, PIO_READ);
VRFY((hrc == SUCCESS), "do_fopen failed");
/*
@@ -304,7 +319,7 @@ fprintf(stderr, "filename=%s\n", fname);
/* Close file for read */
hrc = do_fclose(iot, &fd);
VRFY((hrc == SUCCESS), "do_fclose failed");
- remove(fname);
+ do_cleanupfile(fname);
}
done:
@@ -329,8 +344,8 @@ done:
}
/* release MPI resources */
- if (comm != MPI_COMM_NULL){
- mrc = MPI_Comm_free(&comm);
+ if (pio_comm_g != MPI_COMM_NULL){
+ mrc = MPI_Comm_free(&pio_comm_g);
if (mrc != MPI_SUCCESS) {
fprintf(stderr, "MPI_Comm_free failed\n");
ret_code = FAIL;
@@ -463,7 +478,7 @@ pio_create_filename(iotype iot, const char *base_name, char *fullname, size_t si
*/
static herr_t
do_write(file_descr *fd, iotype iot, long ndsets,
- long nelmts, long buf_size, char *buffer, MPI_Comm comm)
+ long nelmts, long buf_size, char *buffer)
{
int ret_code = SUCCESS;
int rc; /*routine return code */
@@ -474,9 +489,11 @@ do_write(file_descr *fd, iotype iot, long ndsets,
long nelmts_towrite, nelmts_written;
char dname[64];
off_t dset_offset; /*dataset offset in a file */
+ off_t file_offset; /*file offset of the next transfer */
long dset_size; /*one dataset size in bytes */
long nelmts_in_buf;
- int myrank, nprocs = 1;
+ long elmts_begin; /*first elmt this process transfer */
+ long elmts_count; /*number of elmts this process transfer */
/* HDF5 variables */
herr_t hrc; /*HDF5 return code */
@@ -485,10 +502,6 @@ do_write(file_descr *fd, iotype iot, long ndsets,
hid_t h5mem_space_id = -1; /*memory dataspace ID */
hid_t h5ds_id = -1; /* dataset handle */
- /* determine the mpi rank of in this comm */
- MPI_Comm_size(comm, &nprocs);
- MPI_Comm_rank(comm, &myrank);
-
/* calculate dataset parameters. data type is always native C int */
dset_size = nelmts * ELMT_SIZE;
nelmts_in_buf = buf_size/ELMT_SIZE;
@@ -531,17 +544,33 @@ do_write(file_descr *fd, iotype iot, long ndsets,
break;
}
+ /* Calculate the first element and how many elements this process
+ * transfer. First calculate the beginning element of this process
+ * and the next process. Count of elements is the difference between
+ * these two beginnings. This way, it avoids any rounding errors.
+ */
+ elmts_begin = (nelmts * 1.0)/pio_mpi_nprocs_g*pio_mpi_rank_g;
+ if (pio_mpi_rank_g < (pio_mpi_nprocs_g - 1)){
+ elmts_count = ((nelmts * 1.0)/pio_mpi_nprocs_g*(pio_mpi_rank_g+1)) - elmts_begin;
+ }else{
+ /* last process. Take whatever are left */
+ elmts_count = nelmts - elmts_begin;
+ }
+fprintf(stderr, "proc %d: elmts_begin=%ld, elmts_count=%ld\n",
+ pio_mpi_rank_g, elmts_begin, elmts_count);
+
nelmts_written = 0 ;
- while (nelmts_written < nelmts){
- nelmts_towrite = nelmts - nelmts_written;
+ while (nelmts_written < elmts_count){
+ nelmts_towrite = elmts_count - nelmts_written;
- if (nelmts - nelmts_written >= nelmts_in_buf) {
+ if (elmts_count - nelmts_written >= nelmts_in_buf) {
nelmts_towrite = nelmts_in_buf;
} else {
/* last write of a partial buffer */
- nelmts_towrite = nelmts - nelmts_written;
+ nelmts_towrite = elmts_count - nelmts_written;
}
+ file_offset = dset_offset + (elmts_begin + nelmts_written)*ELMT_SIZE;
/*Prepare write data*/
{
@@ -553,18 +582,19 @@ do_write(file_descr *fd, iotype iot, long ndsets,
}
/* Write */
-
+fprintf(stderr, "proc %d: writes %ld bytes at file-offset %ld\n",
+ pio_mpi_rank_g, nelmts_towrite*ELMT_SIZE, file_offset);
/* Calculate offset of write within a dataset/file */
switch (iot){
case RAW:
- rc = RAWSEEK(fd->rawfd, dset_offset + nelmts_written*ELMT_SIZE);
+ rc = RAWSEEK(fd->rawfd, file_offset);
VRFY((rc>=0), "RAWSEEK");
rc = RAWWRITE(fd->rawfd, buffer, nelmts_towrite*ELMT_SIZE);
VRFY((rc==(nelmts_towrite*ELMT_SIZE)), "RAWWRITE");
break;
case MPIO:
- mpi_offset = dset_offset;
+ mpi_offset = file_offset;
mrc = MPI_File_write_at(fd->mpifd, mpi_offset, buffer,
nelmts_towrite*ELMT_SIZE, MPI_CHAR, &mpi_status);
VRFY((mrc==MPI_SUCCESS), "MPIO_WRITE");
@@ -624,8 +654,7 @@ done:
* Modifications:
*/
static herr_t
-do_fopen(iotype iot, char *fname, file_descr *fd /*out*/, int flags,
- MPI_Comm comm)
+do_fopen(iotype iot, char *fname, file_descr *fd /*out*/, int flags)
{
int ret_code = SUCCESS, mrc;
herr_t hrc;
@@ -633,7 +662,7 @@ do_fopen(iotype iot, char *fname, file_descr *fd /*out*/, int flags,
switch (iot) {
case RAW:
- if ((flags | PIO_CREATE) || (flags | PIO_WRITE)) {
+ if (flags & (PIO_CREATE | PIO_WRITE)) {
fd->rawfd = RAWCREATE(fname);
} else {
fd->rawfd = RAWOPEN(fname, O_RDONLY);
@@ -647,11 +676,11 @@ do_fopen(iotype iot, char *fname, file_descr *fd /*out*/, int flags,
break;
case MPIO:
- if ((flags | PIO_CREATE) || (flags | PIO_WRITE)) {
- mrc = MPI_File_open(comm, fname, MPI_MODE_CREATE | MPI_MODE_RDWR,
+ if (flags & (PIO_CREATE | PIO_WRITE)) {
+ mrc = MPI_File_open(pio_comm_g, fname, MPI_MODE_CREATE | MPI_MODE_RDWR,
MPI_INFO_NULL, &(fd->mpifd));
} else {
- mrc = MPI_File_open(comm, fname, MPI_MODE_RDONLY,
+ mrc = MPI_File_open(pio_comm_g, fname, MPI_MODE_RDONLY,
MPI_INFO_NULL, &(fd->mpifd));
}
@@ -670,7 +699,7 @@ do_fopen(iotype iot, char *fname, file_descr *fd /*out*/, int flags,
GOTOERROR(FAIL);
}
- hrc = H5Pset_fapl_mpio(acc_tpl, comm, MPI_INFO_NULL);
+ hrc = H5Pset_fapl_mpio(acc_tpl, pio_comm_g, MPI_INFO_NULL);
if (hrc < 0) {
fprintf(stderr, "HDF5 Property List Set failed\n");
@@ -678,7 +707,7 @@ do_fopen(iotype iot, char *fname, file_descr *fd /*out*/, int flags,
}
/* create the parallel file */
- if ((flags | PIO_CREATE) || (flags | PIO_WRITE)) {
+ if (flags & (PIO_CREATE | PIO_WRITE)) {
fd->h5fd = H5Fcreate(fname, H5F_ACC_TRUNC, H5P_DEFAULT, acc_tpl);
} else {
fd->h5fd = H5Fopen(fname, H5P_DEFAULT, acc_tpl);
@@ -756,4 +785,22 @@ done:
return ret_code;
}
+
+/*
+ * Cleanup temporary file, fname unless HDF5_NOCLEANUP is set.
+ * Only Proc 0 of the PIO communicator will do the cleanup. Other processes
+ * just returns.
+ */
+static void do_cleanupfile(char *fname)
+{
+ if (pio_mpi_rank_g != 0)
+ return;
+
+ if (clean_file_g == -1)
+ clean_file_g = (getenv("HDF5_NOCLEANUP")==NULL) ? 1 : 0;
+
+fprintf(stderr, "clean_file_g=%d\n", clean_file_g);
+ if (clean_file_g)
+ remove(fname);
+}
#endif /* H5_HAVE_PARALLEL */