summaryrefslogtreecommitdiffstats
path: root/perform/pio_engine.c
diff options
context:
space:
mode:
authorAlbert Cheng <acheng@hdfgroup.org>2001-12-12 19:55:21 (GMT)
committerAlbert Cheng <acheng@hdfgroup.org>2001-12-12 19:55:21 (GMT)
commitc227600599dd8b5ecfdb27c8310a612214d79969 (patch)
tree71a84843d35df26fbf440d00018088cd309cf3b1 /perform/pio_engine.c
parentfd48265a02402e3911dadf4dc6895daa1eb39543 (diff)
downloadhdf5-c227600599dd8b5ecfdb27c8310a612214d79969.zip
hdf5-c227600599dd8b5ecfdb27c8310a612214d79969.tar.gz
hdf5-c227600599dd8b5ecfdb27c8310a612214d79969.tar.bz2
[svn-r4708] Purpose:
Bug fixes, new features Description: There was a coding error in handling file open flags. Changed it to use &. Added do_cleanup to cleanup temporary test files but only if $HDF5_NOCLEANUP is not set. This is consistent with other test programs. Added logic so that each process is writing its own slabs of data only. Moved the number of process, rank of process and the communicator used for PIO run to be global variables. Makes the coding easier. (but this is not thread-safe.) Platforms tested: modi4(pp) and eirene(pp).
Diffstat (limited to 'perform/pio_engine.c')
-rw-r--r--perform/pio_engine.c125
1 files changed, 86 insertions, 39 deletions
diff --git a/perform/pio_engine.c b/perform/pio_engine.c
index 9bc026a..e35276b 100644
--- a/perform/pio_engine.c
+++ b/perform/pio_engine.c
@@ -35,13 +35,13 @@
#define GOTOERROR(errcode) { ret_code = errcode; goto done; }
#define GOTODONE { goto done; }
#define ERRMSG(mesg) { \
- fprintf(stderr, "Proc %d: ", myrank); \
+ fprintf(stderr, "Proc %d: ", pio_mpi_rank_g); \
fprintf(stderr, "*** Assertion failed (%s) at line %4d in %s\n", \
mesg, (int)__LINE__, __FILE__); \
}
#define MSG(mesg) { \
- fprintf(stderr, "Proc %d: ", myrank); \
+ fprintf(stderr, "Proc %d: ", pio_mpi_rank_g); \
fprintf(stderr, "(%s) at line %4d in %s\n", \
mesg, (int)__LINE__, __FILE__); \
}
@@ -92,6 +92,17 @@ enum {
PIO_READ = 4
};
+/* Global variables */
+MPI_Comm pio_comm_g; /* Communicator to run the PIO */
+int pio_mpi_rank_g; /* MPI rank of pio_comm_g */
+int pio_mpi_nprocs_g; /* number of processes of pio_comm_g */
+
+static int clean_file_g = -1; /*whether to cleanup temporary test */
+ /*files. -1 is not defined; */
+ /*0 is no cleanup; 1 is do cleanup */
+
+
+
/*
* In a parallel machine, the filesystem suitable for compiling is
* unlikely a parallel file system that is suitable for parallel I/O.
@@ -122,10 +133,11 @@ typedef union _file_descr {
static char *pio_create_filename(iotype iot, const char *base_name,
char *fullname, size_t size);
static herr_t do_write(file_descr *fd, iotype iot, long ndsets,
- long nelmts, long buf_size, char *buffer, MPI_Comm comm);
+ long nelmts, long buf_size, char *buffer);
static herr_t do_fopen(iotype iot, char *fname, file_descr *fd /*out*/,
- int flags, MPI_Comm comm);
+ int flags);
static herr_t do_fclose(iotype iot, file_descr *fd);
+static void do_cleanupfile(char *fname);
results
do_pio(parameters param)
@@ -151,9 +163,10 @@ do_pio(parameters param)
herr_t hrc; /*HDF5 return code */
/* MPI variables */
- MPI_Comm comm = MPI_COMM_NULL;
int myrank, nprocs = 1;
+ pio_comm_g = MPI_COMM_NULL;
+
/* Sanity check parameters */
/* IO type */
@@ -215,7 +228,7 @@ do_pio(parameters param)
GOTOERROR(FAIL);
}
-#if 0
+#if 1
/* DEBUG*/
fprintf(stderr, "nfiles=%d\n", nfiles);
fprintf(stderr, "ndsets=%ld\n", ndsets);
@@ -225,14 +238,16 @@ fprintf(stderr, "maxprocs=%d\n", maxprocs);
fprintf(stderr, "buffer size=%ld\n", buf_size);
nfiles=MIN(3, nfiles);
ndsets=MIN(5, ndsets);
+/*nelmts=MIN(1000, nelmts);*/
+buf_size=MIN(1024*1024, buf_size);
/* DEBUG END */
#endif
- /* Create a sub communicator for this run. Easier to use the first N
+ /* Create a sub communicator for this PIO run. Easier to use the first N
* processes. */
MPI_Comm_rank(MPI_COMM_WORLD, &myrank);
color = (myrank < maxprocs);
- mrc = MPI_Comm_split(MPI_COMM_WORLD, color, myrank, &comm);
+ mrc = MPI_Comm_split(MPI_COMM_WORLD, color, myrank, &pio_comm_g);
if (mrc != MPI_SUCCESS) {
fprintf(stderr, "MPI_Comm_split failed\n");
@@ -241,13 +256,13 @@ ndsets=MIN(5, ndsets);
if (!color){
/* not involved in this run */
- mrc = MPI_Comm_free(&comm);
+ mrc = MPI_Comm_free(&pio_comm_g);
GOTODONE;
}
- /* determine the mpi rank of in the new comm */
- MPI_Comm_size(comm, &nprocs);
- MPI_Comm_rank(comm, &myrank);
+ /* determine the mpi rank of in the PIO communicator */
+ MPI_Comm_size(pio_comm_g, &pio_mpi_nprocs_g);
+ MPI_Comm_rank(pio_comm_g, &pio_mpi_rank_g);
/* allocate data buffer */
buffer = malloc(buf_size);
@@ -270,13 +285,13 @@ ndsets=MIN(5, ndsets);
fprintf(stderr, "filename=%s\n", fname);
set_time(res.timers, HDF5_FILE_OPENCLOSE, START);
- hrc = do_fopen(iot, fname, &fd, PIO_CREATE | PIO_WRITE, comm);
+ hrc = do_fopen(iot, fname, &fd, PIO_CREATE | PIO_WRITE);
set_time(res.timers, HDF5_FILE_OPENCLOSE, STOP);
VRFY((hrc == SUCCESS), "do_fopen failed");
set_time(res.timers, HDF5_WRITE_FIXED_DIMS, START);
- hrc = do_write(&fd, iot, ndsets, nelmts, buf_size, buffer, comm);
+ hrc = do_write(&fd, iot, ndsets, nelmts, buf_size, buffer);
set_time(res.timers, HDF5_WRITE_FIXED_DIMS, STOP);
VRFY((hrc == SUCCESS), "do_write failed");
@@ -292,7 +307,7 @@ fprintf(stderr, "filename=%s\n", fname);
* Read performance measurement
*/
/* Open file for read */
- hrc = do_fopen(iot, fname, &fd, PIO_READ, comm);
+ hrc = do_fopen(iot, fname, &fd, PIO_READ);
VRFY((hrc == SUCCESS), "do_fopen failed");
/*
@@ -304,7 +319,7 @@ fprintf(stderr, "filename=%s\n", fname);
/* Close file for read */
hrc = do_fclose(iot, &fd);
VRFY((hrc == SUCCESS), "do_fclose failed");
- remove(fname);
+ do_cleanupfile(fname);
}
done:
@@ -329,8 +344,8 @@ done:
}
/* release MPI resources */
- if (comm != MPI_COMM_NULL){
- mrc = MPI_Comm_free(&comm);
+ if (pio_comm_g != MPI_COMM_NULL){
+ mrc = MPI_Comm_free(&pio_comm_g);
if (mrc != MPI_SUCCESS) {
fprintf(stderr, "MPI_Comm_free failed\n");
ret_code = FAIL;
@@ -463,7 +478,7 @@ pio_create_filename(iotype iot, const char *base_name, char *fullname, size_t si
*/
static herr_t
do_write(file_descr *fd, iotype iot, long ndsets,
- long nelmts, long buf_size, char *buffer, MPI_Comm comm)
+ long nelmts, long buf_size, char *buffer)
{
int ret_code = SUCCESS;
int rc; /*routine return code */
@@ -474,9 +489,11 @@ do_write(file_descr *fd, iotype iot, long ndsets,
long nelmts_towrite, nelmts_written;
char dname[64];
off_t dset_offset; /*dataset offset in a file */
+ off_t file_offset; /*file offset of the next transfer */
long dset_size; /*one dataset size in bytes */
long nelmts_in_buf;
- int myrank, nprocs = 1;
+ long elmts_begin; /*first elmt this process transfer */
+ long elmts_count; /*number of elmts this process transfer */
/* HDF5 variables */
herr_t hrc; /*HDF5 return code */
@@ -485,10 +502,6 @@ do_write(file_descr *fd, iotype iot, long ndsets,
hid_t h5mem_space_id = -1; /*memory dataspace ID */
hid_t h5ds_id = -1; /* dataset handle */
- /* determine the mpi rank of in this comm */
- MPI_Comm_size(comm, &nprocs);
- MPI_Comm_rank(comm, &myrank);
-
/* calculate dataset parameters. data type is always native C int */
dset_size = nelmts * ELMT_SIZE;
nelmts_in_buf = buf_size/ELMT_SIZE;
@@ -531,17 +544,33 @@ do_write(file_descr *fd, iotype iot, long ndsets,
break;
}
+ /* Calculate the first element and how many elements this process
+ * transfer. First calculate the beginning element of this process
+ * and the next process. Count of elements is the difference between
+ * these two beginnings. This way, it avoids any rounding errors.
+ */
+ elmts_begin = (nelmts * 1.0)/pio_mpi_nprocs_g*pio_mpi_rank_g;
+ if (pio_mpi_rank_g < (pio_mpi_nprocs_g - 1)){
+ elmts_count = ((nelmts * 1.0)/pio_mpi_nprocs_g*(pio_mpi_rank_g+1)) - elmts_begin;
+ }else{
+ /* last process. Take whatever are left */
+ elmts_count = nelmts - elmts_begin;
+ }
+fprintf(stderr, "proc %d: elmts_begin=%ld, elmts_count=%ld\n",
+ pio_mpi_rank_g, elmts_begin, elmts_count);
+
nelmts_written = 0 ;
- while (nelmts_written < nelmts){
- nelmts_towrite = nelmts - nelmts_written;
+ while (nelmts_written < elmts_count){
+ nelmts_towrite = elmts_count - nelmts_written;
- if (nelmts - nelmts_written >= nelmts_in_buf) {
+ if (elmts_count - nelmts_written >= nelmts_in_buf) {
nelmts_towrite = nelmts_in_buf;
} else {
/* last write of a partial buffer */
- nelmts_towrite = nelmts - nelmts_written;
+ nelmts_towrite = elmts_count - nelmts_written;
}
+ file_offset = dset_offset + (elmts_begin + nelmts_written)*ELMT_SIZE;
/*Prepare write data*/
{
@@ -553,18 +582,19 @@ do_write(file_descr *fd, iotype iot, long ndsets,
}
/* Write */
-
+fprintf(stderr, "proc %d: writes %ld bytes at file-offset %ld\n",
+ pio_mpi_rank_g, nelmts_towrite*ELMT_SIZE, file_offset);
/* Calculate offset of write within a dataset/file */
switch (iot){
case RAW:
- rc = RAWSEEK(fd->rawfd, dset_offset + nelmts_written*ELMT_SIZE);
+ rc = RAWSEEK(fd->rawfd, file_offset);
VRFY((rc>=0), "RAWSEEK");
rc = RAWWRITE(fd->rawfd, buffer, nelmts_towrite*ELMT_SIZE);
VRFY((rc==(nelmts_towrite*ELMT_SIZE)), "RAWWRITE");
break;
case MPIO:
- mpi_offset = dset_offset;
+ mpi_offset = file_offset;
mrc = MPI_File_write_at(fd->mpifd, mpi_offset, buffer,
nelmts_towrite*ELMT_SIZE, MPI_CHAR, &mpi_status);
VRFY((mrc==MPI_SUCCESS), "MPIO_WRITE");
@@ -624,8 +654,7 @@ done:
* Modifications:
*/
static herr_t
-do_fopen(iotype iot, char *fname, file_descr *fd /*out*/, int flags,
- MPI_Comm comm)
+do_fopen(iotype iot, char *fname, file_descr *fd /*out*/, int flags)
{
int ret_code = SUCCESS, mrc;
herr_t hrc;
@@ -633,7 +662,7 @@ do_fopen(iotype iot, char *fname, file_descr *fd /*out*/, int flags,
switch (iot) {
case RAW:
- if ((flags | PIO_CREATE) || (flags | PIO_WRITE)) {
+ if (flags & (PIO_CREATE | PIO_WRITE)) {
fd->rawfd = RAWCREATE(fname);
} else {
fd->rawfd = RAWOPEN(fname, O_RDONLY);
@@ -647,11 +676,11 @@ do_fopen(iotype iot, char *fname, file_descr *fd /*out*/, int flags,
break;
case MPIO:
- if ((flags | PIO_CREATE) || (flags | PIO_WRITE)) {
- mrc = MPI_File_open(comm, fname, MPI_MODE_CREATE | MPI_MODE_RDWR,
+ if (flags & (PIO_CREATE | PIO_WRITE)) {
+ mrc = MPI_File_open(pio_comm_g, fname, MPI_MODE_CREATE | MPI_MODE_RDWR,
MPI_INFO_NULL, &(fd->mpifd));
} else {
- mrc = MPI_File_open(comm, fname, MPI_MODE_RDONLY,
+ mrc = MPI_File_open(pio_comm_g, fname, MPI_MODE_RDONLY,
MPI_INFO_NULL, &(fd->mpifd));
}
@@ -670,7 +699,7 @@ do_fopen(iotype iot, char *fname, file_descr *fd /*out*/, int flags,
GOTOERROR(FAIL);
}
- hrc = H5Pset_fapl_mpio(acc_tpl, comm, MPI_INFO_NULL);
+ hrc = H5Pset_fapl_mpio(acc_tpl, pio_comm_g, MPI_INFO_NULL);
if (hrc < 0) {
fprintf(stderr, "HDF5 Property List Set failed\n");
@@ -678,7 +707,7 @@ do_fopen(iotype iot, char *fname, file_descr *fd /*out*/, int flags,
}
/* create the parallel file */
- if ((flags | PIO_CREATE) || (flags | PIO_WRITE)) {
+ if (flags & (PIO_CREATE | PIO_WRITE)) {
fd->h5fd = H5Fcreate(fname, H5F_ACC_TRUNC, H5P_DEFAULT, acc_tpl);
} else {
fd->h5fd = H5Fopen(fname, H5P_DEFAULT, acc_tpl);
@@ -756,4 +785,22 @@ done:
return ret_code;
}
+
+/*
+ * Cleanup temporary file, fname unless HDF5_NOCLEANUP is set.
+ * Only Proc 0 of the PIO communicator will do the cleanup. Other processes
+ * just returns.
+ */
+static void do_cleanupfile(char *fname)
+{
+ if (pio_mpi_rank_g != 0)
+ return;
+
+ if (clean_file_g == -1)
+ clean_file_g = (getenv("HDF5_NOCLEANUP")==NULL) ? 1 : 0;
+
+fprintf(stderr, "clean_file_g=%d\n", clean_file_g);
+ if (clean_file_g)
+ remove(fname);
+}
#endif /* H5_HAVE_PARALLEL */