summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMohamad Chaarawi <chaarawi@hdfgroup.org>2014-08-04 19:58:03 (GMT)
committerMohamad Chaarawi <chaarawi@hdfgroup.org>2014-08-04 19:58:03 (GMT)
commitdb51b5504991175e87e58ffcaca2c34f672c522d (patch)
tree49276bb700833e31e0ac9e6e69c2b8e5c5838fb1
parent4defea00206886aa5dded50896c1ac5c5c1fcceb (diff)
downloadhdf5-db51b5504991175e87e58ffcaca2c34f672c522d.zip
hdf5-db51b5504991175e87e58ffcaca2c34f672c522d.tar.gz
hdf5-db51b5504991175e87e58ffcaca2c34f672c522d.tar.bz2
[svn-r25509] HDFFV-8878:
Make the fill value operation for chunked datasets in parallel collective/fast. tested with h5committest
-rw-r--r--src/H5Dchunk.c245
1 files changed, 205 insertions, 40 deletions
diff --git a/src/H5Dchunk.c b/src/H5Dchunk.c
index 20b3c1a..6914607 100644
--- a/src/H5Dchunk.c
+++ b/src/H5Dchunk.c
@@ -176,6 +176,13 @@ typedef struct H5D_chunk_file_iter_ud_t {
#endif /* H5_HAVE_PARALLEL */
} H5D_chunk_file_iter_ud_t;
+#ifdef H5_HAVE_PARALLEL
+/* information to construct a collective I/O operation for filling chunks */
+typedef struct H5D_chunk_coll_info_t {
+ size_t num_io; /* Number of write operations */
+ haddr_t *addr; /* array of the file addresses of the write operation */
+} H5D_chunk_coll_info_t;
+#endif /* H5_HAVE_PARALLEL */
/********************/
/* Local Prototypes */
@@ -230,7 +237,10 @@ static herr_t H5D__chunk_cache_evict(const H5D_t *dset, hid_t dxpl_id,
static herr_t H5D__chunk_cache_prune(const H5D_t *dset, hid_t dxpl_id,
const H5D_dxpl_cache_t *dxpl_cache, size_t size);
static herr_t H5D__chunk_prune_fill(H5D_chunk_it_ud1_t *udata);
-
+#ifdef H5_HAVE_PARALLEL
+static herr_t H5D__chunk_collective_fill(const H5D_t *dset, hid_t dxpl_id,
+ H5D_chunk_coll_info_t *chunk_info, size_t chunk_size, const void *fill_buf);
+#endif /* H5_HAVE_PARALLEL */
/*********************/
/* Package Variables */
@@ -3247,6 +3257,7 @@ H5D__chunk_allocate(const H5D_t *dset, hid_t dxpl_id, hbool_t full_overwrite,
hsize_t max_unalloc[H5O_LAYOUT_NDIMS]; /* Last chunk in each dimension that is unallocated */
hsize_t chunk_offset[H5O_LAYOUT_NDIMS]; /* Offset of current chunk */
size_t orig_chunk_size; /* Original size of chunk in bytes */
+ size_t chunk_size; /* Actual size of chunk in bytes, possibly filtered */
unsigned filter_mask = 0; /* Filter mask for chunks that have them */
const H5O_layout_t *layout = &(dset->shared->layout); /* Dataset layout */
const H5O_pline_t *pline = &(dset->shared->dcpl_cache.pline); /* I/O pipeline info */
@@ -3256,11 +3267,9 @@ H5D__chunk_allocate(const H5D_t *dset, hid_t dxpl_id, hbool_t full_overwrite,
H5D_dxpl_cache_t _dxpl_cache; /* Data transfer property cache buffer */
H5D_dxpl_cache_t *dxpl_cache = &_dxpl_cache; /* Data transfer property cache */
#ifdef H5_HAVE_PARALLEL
- MPI_Comm mpi_comm = MPI_COMM_NULL; /* MPI communicator for file */
- int mpi_rank = (-1); /* This process's rank */
- int mpi_code; /* MPI return code */
hbool_t blocks_written = FALSE; /* Flag to indicate that chunk was actually written */
hbool_t using_mpi = FALSE; /* Flag to indicate that the file is being accessed with an MPI-capable file driver */
+ H5D_chunk_coll_info_t chunk_info; /* chunk address information for doing I/O */
#endif /* H5_HAVE_PARALLEL */
hbool_t carry; /* Flag to indicate that chunk increment carrys to higher dimension (sorta) */
int space_ndims; /* Dataset's space rank */
@@ -3269,7 +3278,6 @@ H5D__chunk_allocate(const H5D_t *dset, hid_t dxpl_id, hbool_t full_overwrite,
int op_dim; /* Current operationg dimension */
H5D_fill_buf_info_t fb_info; /* Dataset's fill buffer info */
hbool_t fb_info_init = FALSE; /* Whether the fill value buffer has been initialized */
- hid_t data_dxpl_id; /* DXPL ID to use for raw data I/O operations */
herr_t ret_value = SUCCEED; /* Return value */
FUNC_ENTER_PACKAGE_TAG(dxpl_id, dset->oloc.addr, FAIL)
@@ -3299,30 +3307,17 @@ H5D__chunk_allocate(const H5D_t *dset, hid_t dxpl_id, hbool_t full_overwrite,
#ifdef H5_HAVE_PARALLEL
/* Retrieve MPI parameters */
if(H5F_HAS_FEATURE(dset->oloc.file, H5FD_FEAT_HAS_MPI)) {
- /* Get the MPI communicator */
- if(MPI_COMM_NULL == (mpi_comm = H5F_mpi_get_comm(dset->oloc.file)))
- HGOTO_ERROR(H5E_INTERNAL, H5E_MPI, FAIL, "Can't retrieve MPI communicator")
-
- /* Get the MPI rank */
- if((mpi_rank = H5F_mpi_get_rank(dset->oloc.file)) < 0)
- HGOTO_ERROR(H5E_INTERNAL, H5E_MPI, FAIL, "Can't retrieve MPI rank")
-
/* Set the MPI-capable file driver flag */
using_mpi = TRUE;
- /* Use the internal "independent" DXPL */
- data_dxpl_id = H5AC_ind_dxpl_id;
+ /* init chunk info stuff for collective I/O */
+ chunk_info.num_io = 0;
+ chunk_info.addr = NULL;
} /* end if */
- else {
-#endif /* H5_HAVE_PARALLEL */
- /* Use the DXPL we were given */
- data_dxpl_id = dxpl_id;
-#ifdef H5_HAVE_PARALLEL
- } /* end else */
#endif /* H5_HAVE_PARALLEL */
/* Fill the DXPL cache values for later use */
- if(H5D__get_dxpl_cache(data_dxpl_id, &dxpl_cache) < 0)
+ if(H5D__get_dxpl_cache(dxpl_id, &dxpl_cache) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_CANTGET, FAIL, "can't fill dxpl cache")
/* Get original chunk size */
@@ -3352,7 +3347,7 @@ H5D__chunk_allocate(const H5D_t *dset, hid_t dxpl_id, hbool_t full_overwrite,
if(H5D__fill_init(&fb_info, NULL, (H5MM_allocate_t)H5D__chunk_alloc,
(void *)pline, (H5MM_free_t)H5D__chunk_xfree, (void *)pline,
&dset->shared->dcpl_cache.fill, dset->shared->type,
- dset->shared->type_id, (size_t)0, orig_chunk_size, data_dxpl_id) < 0)
+ dset->shared->type_id, (size_t)0, orig_chunk_size, dxpl_id) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_CANTINIT, FAIL, "can't initialize fill buffer info")
fb_info_init = TRUE;
@@ -3426,7 +3421,8 @@ H5D__chunk_allocate(const H5D_t *dset, hid_t dxpl_id, hbool_t full_overwrite,
} /* end else */
while(!carry) {
- size_t chunk_size = orig_chunk_size; /* Size of chunk in bytes, possibly filtered */
+ /* Reset size of chunk in bytes, in case filtered size changes */
+ chunk_size = orig_chunk_size;
#ifndef NDEBUG
/* None of the chunks should be allocated */
@@ -3462,6 +3458,9 @@ H5D__chunk_allocate(const H5D_t *dset, hid_t dxpl_id, hbool_t full_overwrite,
if(fb_info_init && fb_info.has_vlen_fill_type) {
/* Sanity check */
HDassert(should_fill);
+#ifdef H5_HAVE_PARALLEL
+ HDassert(!using_mpi); /* Can't write VL datatypes in parallel currently */
+#endif
/* Check to make sure the buffer is large enough. It is
* possible (though ill-advised) for the filter to shrink the
@@ -3474,7 +3473,7 @@ H5D__chunk_allocate(const H5D_t *dset, hid_t dxpl_id, hbool_t full_overwrite,
} /* end if */
/* Fill the buffer with VL datatype fill values */
- if(H5D__fill_refill_vl(&fb_info, fb_info.elmts_per_buf, data_dxpl_id) < 0)
+ if(H5D__fill_refill_vl(&fb_info, fb_info.elmts_per_buf, dxpl_id) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_CANTCONVERT, FAIL, "can't refill fill value buffer")
/* Check if there are filters which need to be applied to the chunk */
@@ -3519,18 +3518,26 @@ H5D__chunk_allocate(const H5D_t *dset, hid_t dxpl_id, hbool_t full_overwrite,
#ifdef H5_HAVE_PARALLEL
/* Check if this file is accessed with an MPI-capable file driver */
if(using_mpi) {
- /* Write the chunks out from only one process */
- /* !! Use the internal "independent" DXPL!! -QAK */
- if(H5_PAR_META_WRITE == mpi_rank)
- if(H5F_block_write(dset->oloc.file, H5FD_MEM_DRAW, udata.addr, chunk_size, data_dxpl_id, fb_info.fill_buf) < 0)
- HGOTO_ERROR(H5E_IO, H5E_WRITEERROR, FAIL, "unable to write raw data to file")
+ /* collect all chunk addresses to be written to
+ write collectively at the end */
+ /* allocate/resize address array if no more space left */
+ if(0 == chunk_info.num_io % 1024) {
+ if(NULL == (chunk_info.addr = (haddr_t *)HDrealloc
+ (chunk_info.addr, (chunk_info.num_io + 1024) * sizeof(haddr_t))))
+ HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "memory allocation failed for chunk addresses");
+ } /* end if */
+
+ /* Store the chunk's address for later */
+ chunk_info.addr[chunk_info.num_io] = udata.addr;
+ chunk_info.num_io++;
- /* Indicate that blocks are being written */
+ /* Indicate that blocks will be written */
blocks_written = TRUE;
} /* end if */
else {
#endif /* H5_HAVE_PARALLEL */
- if(H5F_block_write(dset->oloc.file, H5FD_MEM_DRAW, udata.addr, chunk_size, data_dxpl_id, fb_info.fill_buf) < 0)
+ if(H5F_block_write(dset->oloc.file, H5FD_MEM_DRAW, udata.addr, chunk_size,
+ dxpl_id, fb_info.fill_buf) < 0)
HGOTO_ERROR(H5E_IO, H5E_WRITEERROR, FAIL, "unable to write raw data to file")
#ifdef H5_HAVE_PARALLEL
} /* end else */
@@ -3563,15 +3570,10 @@ H5D__chunk_allocate(const H5D_t *dset, hid_t dxpl_id, hbool_t full_overwrite,
} /* end for(op_dim=0...) */
#ifdef H5_HAVE_PARALLEL
- /* Only need to block at the barrier if we actually initialized a chunk */
- /* using an MPI-capable file driver */
+ /* do final collective I/O */
if(using_mpi && blocks_written) {
- /* Wait at barrier to avoid race conditions where some processes are
- * still writing out chunks and other processes race ahead to read
- * them in, getting bogus data.
- */
- if(MPI_SUCCESS != (mpi_code = MPI_Barrier(mpi_comm)))
- HMPI_GOTO_ERROR(FAIL, "MPI_Barrier failed", mpi_code)
+ if(H5D__chunk_collective_fill(dset, dxpl_id, &chunk_info, chunk_size, fb_info.fill_buf) < 0)
+ HGOTO_ERROR(H5E_IO, H5E_WRITEERROR, FAIL, "unable to write raw data to file")
} /* end if */
#endif /* H5_HAVE_PARALLEL */
@@ -3583,9 +3585,172 @@ done:
if(fb_info_init && H5D__fill_term(&fb_info) < 0)
HDONE_ERROR(H5E_DATASET, H5E_CANTFREE, FAIL, "Can't release fill buffer info")
+#ifdef H5_HAVE_PARALLEL
+ if(using_mpi) {
+ if(chunk_info.addr)
+ HDfree(chunk_info.addr);
+ } /* end if */
+#endif
+
FUNC_LEAVE_NOAPI_TAG(ret_value, FAIL)
} /* end H5D__chunk_allocate() */
+#ifdef H5_HAVE_PARALLEL
+
+/*-------------------------------------------------------------------------
+ * Function: H5D__chunk_collective_fill
+ *
+ * Purpose: Use MPIO collective write to fill the chunks (if number of
+ * chunks to fill is greater than the number of MPI procs;
+ * otherwise use independent I/O).
+ *
+ * Return: Non-negative on success/Negative on failure
+ *
+ * Programmer: Mohamad Chaarawi
+ * July 30, 2014
+ *
+ *-------------------------------------------------------------------------
+ */
+static herr_t
+H5D__chunk_collective_fill(const H5D_t *dset, hid_t dxpl_id,
+ H5D_chunk_coll_info_t *chunk_info, size_t chunk_size, const void *fill_buf)
+{
+ MPI_Comm mpi_comm = MPI_COMM_NULL; /* MPI communicator for file */
+ int mpi_rank = (-1); /* This process's rank */
+ int mpi_size = (-1); /* MPI Comm size */
+ int mpi_code; /* MPI return code */
+ size_t num_blocks; /* Number of blocks between processes. */
+ size_t leftover_blocks; /* Number of leftover blocks to handle */
+ int blocks, leftover, block_len; /* converted to int for MPI */
+ MPI_Aint *chunk_disp_array = NULL;
+ int *block_lens = NULL;
+ MPI_Datatype mem_type, file_type;
+ hid_t data_dxpl_id = -1; /* DXPL ID to use for raw data I/O operations */
+ int i; /* Local index variable */
+ herr_t ret_value = SUCCEED; /* Return value */
+
+ FUNC_ENTER_STATIC
+
+ /* Get the MPI communicator */
+ if(MPI_COMM_NULL == (mpi_comm = H5F_mpi_get_comm(dset->oloc.file)))
+ HGOTO_ERROR(H5E_INTERNAL, H5E_MPI, FAIL, "Can't retrieve MPI communicator")
+
+ /* Get the MPI rank */
+ if((mpi_rank = H5F_mpi_get_rank(dset->oloc.file)) < 0)
+ HGOTO_ERROR(H5E_INTERNAL, H5E_MPI, FAIL, "Can't retrieve MPI rank")
+
+ /* Get the MPI size */
+ if((mpi_size = H5F_mpi_get_size(dset->oloc.file)) < 0)
+ HGOTO_ERROR(H5E_INTERNAL, H5E_MPI, FAIL, "Can't retrieve MPI size")
+
+ /* Get a copy of the DXPL, to modify */
+ if((data_dxpl_id = H5P_copy_plist((H5P_genplist_t *)H5I_object(dxpl_id), TRUE)) < 0)
+ HGOTO_ERROR(H5E_PLIST, H5E_CANTCOPY, FAIL, "can't copy property list")
+
+ /* Distribute evenly the number of blocks between processes. */
+ num_blocks = chunk_info->num_io / mpi_size; /* value should be the same on all procs */
+
+ /* after evenly distributing the blocks between processes, are
+ there any leftover blocks for each individual process
+ (round-robin) */
+ leftover_blocks = chunk_info->num_io % mpi_size;
+
+ /* Cast values to types needed by MPI */
+ H5_ASSIGN_OVERFLOW(blocks, num_blocks, size_t, int);
+ H5_ASSIGN_OVERFLOW(leftover, leftover_blocks, size_t, int);
+ H5_ASSIGN_OVERFLOW(block_len, chunk_size, size_t, int);
+
+ /* Allocate buffers */
+ /* (MSC - should not need block_lens if MPI_type_create_hindexed_block is working) */
+ if(NULL == (block_lens = (int *)H5MM_malloc((blocks + 1) * sizeof(int))))
+ HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate chunk lengths buffer")
+ if(NULL == (chunk_disp_array = (MPI_Aint *)H5MM_malloc((blocks + 1) * sizeof(MPI_Aint))))
+ HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate chunk file displacement buffer")
+
+ for(i = 0 ; i < blocks ; i++) {
+ /* store the chunk address as an MPI_Aint */
+ chunk_disp_array[i] = (MPI_Aint)(chunk_info->addr[i + mpi_rank*blocks]);
+
+ /* MSC - should not need this if MPI_type_create_hindexed_block is working */
+ block_lens[i] = block_len;
+
+ /* make sure that the addresses in the datatype are
+ monotonically non decreasing */
+ if(i)
+ HDassert(chunk_disp_array[i] > chunk_disp_array[i - 1]);
+ } /* end if */
+
+ /* calculate if there are any leftover blocks after evenly
+ distributing. If there are, then round robin the distribution
+ to processes 0 -> leftover. */
+ if(leftover && leftover > mpi_rank) {
+ chunk_disp_array[blocks] = (MPI_Aint)chunk_info->addr[blocks*mpi_size + mpi_rank];
+ block_lens[blocks] = block_len;
+ blocks++;
+ }
+
+ /* MSC - should use this if MPI_type_create_hindexed block is working */
+ //mpi_code = MPI_Type_create_hindexed_block(blocks, block_len, chunk_disp_array,
+ //MPI_BYTE, &file_type);
+ mpi_code = MPI_Type_create_hindexed(blocks, block_lens, chunk_disp_array,
+ MPI_BYTE, &file_type);
+ if(mpi_code != MPI_SUCCESS)
+ HMPI_GOTO_ERROR(FAIL, "MPI_Type_create_hindexed failed", mpi_code)
+ if(MPI_SUCCESS != (mpi_code = MPI_Type_commit(&file_type)))
+ HMPI_GOTO_ERROR(FAIL, "MPI_Type_commit failed", mpi_code)
+
+ mpi_code = MPI_Type_create_hvector(blocks, block_len, 0, MPI_BYTE, &mem_type);
+ if(mpi_code != MPI_SUCCESS)
+ HMPI_GOTO_ERROR(FAIL, "MPI_Type_create_hvector failed", mpi_code)
+ if(MPI_SUCCESS != (mpi_code = MPI_Type_commit(&mem_type)))
+ HMPI_GOTO_ERROR(FAIL, "MPI_Type_commit failed", mpi_code)
+
+ /* set MPI-IO VFD properties */
+ {
+ H5FD_mpio_xfer_t xfer_mode = H5FD_MPIO_COLLECTIVE;
+ H5P_genplist_t *plist; /* Property list pointer */
+
+ if(NULL == (plist = H5P_object_verify(data_dxpl_id, H5P_DATASET_XFER)))
+ HGOTO_ERROR(H5E_PLIST, H5E_BADTYPE, FAIL, "not a dataset transfer list")
+
+ /* Set buffer MPI type */
+ if(H5P_set(plist, H5FD_MPI_XFER_MEM_MPI_TYPE_NAME, &mem_type) < 0)
+ HGOTO_ERROR(H5E_PLIST, H5E_CANTSET, FAIL, "can't set MPI-I/O property")
+
+ /* Set File MPI type */
+ if(H5P_set(plist, H5FD_MPI_XFER_FILE_MPI_TYPE_NAME, &file_type) < 0)
+ HGOTO_ERROR(H5E_PLIST, H5E_CANTSET, FAIL, "can't set MPI-I/O property")
+
+ /* set transfer mode */
+ if(H5P_set(plist, H5D_XFER_IO_XFER_MODE_NAME, &xfer_mode) < 0)
+ HGOTO_ERROR(H5E_PLIST, H5E_CANTSET, FAIL, "can't set transfer mode")
+ }
+
+ /* low level write */
+ if(H5F_block_write(dset->oloc.file, H5FD_MEM_DRAW, (haddr_t)0, (blocks) ? (size_t)1 : (size_t)0,
+ data_dxpl_id, fill_buf) < 0)
+ HGOTO_ERROR(H5E_IO, H5E_WRITEERROR, FAIL, "unable to write raw data to file")
+
+ /* Barrier so processes don't race ahead */
+ if(MPI_SUCCESS != (mpi_code = MPI_Barrier(mpi_comm)))
+ HMPI_GOTO_ERROR(FAIL, "MPI_Barrier failed", mpi_code)
+
+done:
+ if(data_dxpl_id > 0 && H5I_dec_ref(data_dxpl_id) < 0)
+ HDONE_ERROR(H5E_DATASET, H5E_CANTFREE, FAIL, "Can't free property list")
+
+ /* free things */
+ if(MPI_SUCCESS != (mpi_code = MPI_Type_free(&file_type)))
+ HMPI_DONE_ERROR(FAIL, "MPI_Type_free failed", mpi_code)
+ if(MPI_SUCCESS != (mpi_code = MPI_Type_free(&mem_type)))
+ HMPI_DONE_ERROR(FAIL, "MPI_Type_free failed", mpi_code)
+ H5MM_xfree(chunk_disp_array);
+ H5MM_xfree(block_lens);
+
+ FUNC_LEAVE_NOAPI(ret_value)
+} /* end H5D__chunk_collective_fill() */
+#endif /* H5_HAVE_PARALLEL */
+
/*-------------------------------------------------------------------------
* Function: H5D__chunk_prune_fill