diff options
author | Mohamad Chaarawi <chaarawi@hdfgroup.org> | 2014-08-04 19:58:03 (GMT) |
---|---|---|
committer | Mohamad Chaarawi <chaarawi@hdfgroup.org> | 2014-08-04 19:58:03 (GMT) |
commit | db51b5504991175e87e58ffcaca2c34f672c522d (patch) | |
tree | 49276bb700833e31e0ac9e6e69c2b8e5c5838fb1 /src/H5Dchunk.c | |
parent | 4defea00206886aa5dded50896c1ac5c5c1fcceb (diff) | |
download | hdf5-db51b5504991175e87e58ffcaca2c34f672c522d.zip hdf5-db51b5504991175e87e58ffcaca2c34f672c522d.tar.gz hdf5-db51b5504991175e87e58ffcaca2c34f672c522d.tar.bz2 |
[svn-r25509] HDFFV-8878:
Make the fill value operation for chunked datasets in parallel collective/fast.
tested with h5committest
Diffstat (limited to 'src/H5Dchunk.c')
-rw-r--r-- | src/H5Dchunk.c | 245 |
1 files changed, 205 insertions, 40 deletions
diff --git a/src/H5Dchunk.c b/src/H5Dchunk.c index 20b3c1a..6914607 100644 --- a/src/H5Dchunk.c +++ b/src/H5Dchunk.c @@ -176,6 +176,13 @@ typedef struct H5D_chunk_file_iter_ud_t { #endif /* H5_HAVE_PARALLEL */ } H5D_chunk_file_iter_ud_t; +#ifdef H5_HAVE_PARALLEL +/* information to construct a collective I/O operation for filling chunks */ +typedef struct H5D_chunk_coll_info_t { + size_t num_io; /* Number of write operations */ + haddr_t *addr; /* array of the file addresses of the write operation */ +} H5D_chunk_coll_info_t; +#endif /* H5_HAVE_PARALLEL */ /********************/ /* Local Prototypes */ @@ -230,7 +237,10 @@ static herr_t H5D__chunk_cache_evict(const H5D_t *dset, hid_t dxpl_id, static herr_t H5D__chunk_cache_prune(const H5D_t *dset, hid_t dxpl_id, const H5D_dxpl_cache_t *dxpl_cache, size_t size); static herr_t H5D__chunk_prune_fill(H5D_chunk_it_ud1_t *udata); - +#ifdef H5_HAVE_PARALLEL +static herr_t H5D__chunk_collective_fill(const H5D_t *dset, hid_t dxpl_id, + H5D_chunk_coll_info_t *chunk_info, size_t chunk_size, const void *fill_buf); +#endif /* H5_HAVE_PARALLEL */ /*********************/ /* Package Variables */ @@ -3247,6 +3257,7 @@ H5D__chunk_allocate(const H5D_t *dset, hid_t dxpl_id, hbool_t full_overwrite, hsize_t max_unalloc[H5O_LAYOUT_NDIMS]; /* Last chunk in each dimension that is unallocated */ hsize_t chunk_offset[H5O_LAYOUT_NDIMS]; /* Offset of current chunk */ size_t orig_chunk_size; /* Original size of chunk in bytes */ + size_t chunk_size; /* Actual size of chunk in bytes, possibly filtered */ unsigned filter_mask = 0; /* Filter mask for chunks that have them */ const H5O_layout_t *layout = &(dset->shared->layout); /* Dataset layout */ const H5O_pline_t *pline = &(dset->shared->dcpl_cache.pline); /* I/O pipeline info */ @@ -3256,11 +3267,9 @@ H5D__chunk_allocate(const H5D_t *dset, hid_t dxpl_id, hbool_t full_overwrite, H5D_dxpl_cache_t _dxpl_cache; /* Data transfer property cache buffer */ H5D_dxpl_cache_t *dxpl_cache = &_dxpl_cache; /* Data transfer property cache */ #ifdef H5_HAVE_PARALLEL - MPI_Comm mpi_comm = MPI_COMM_NULL; /* MPI communicator for file */ - int mpi_rank = (-1); /* This process's rank */ - int mpi_code; /* MPI return code */ hbool_t blocks_written = FALSE; /* Flag to indicate that chunk was actually written */ hbool_t using_mpi = FALSE; /* Flag to indicate that the file is being accessed with an MPI-capable file driver */ + H5D_chunk_coll_info_t chunk_info; /* chunk address information for doing I/O */ #endif /* H5_HAVE_PARALLEL */ hbool_t carry; /* Flag to indicate that chunk increment carrys to higher dimension (sorta) */ int space_ndims; /* Dataset's space rank */ @@ -3269,7 +3278,6 @@ H5D__chunk_allocate(const H5D_t *dset, hid_t dxpl_id, hbool_t full_overwrite, int op_dim; /* Current operationg dimension */ H5D_fill_buf_info_t fb_info; /* Dataset's fill buffer info */ hbool_t fb_info_init = FALSE; /* Whether the fill value buffer has been initialized */ - hid_t data_dxpl_id; /* DXPL ID to use for raw data I/O operations */ herr_t ret_value = SUCCEED; /* Return value */ FUNC_ENTER_PACKAGE_TAG(dxpl_id, dset->oloc.addr, FAIL) @@ -3299,30 +3307,17 @@ H5D__chunk_allocate(const H5D_t *dset, hid_t dxpl_id, hbool_t full_overwrite, #ifdef H5_HAVE_PARALLEL /* Retrieve MPI parameters */ if(H5F_HAS_FEATURE(dset->oloc.file, H5FD_FEAT_HAS_MPI)) { - /* Get the MPI communicator */ - if(MPI_COMM_NULL == (mpi_comm = H5F_mpi_get_comm(dset->oloc.file))) - HGOTO_ERROR(H5E_INTERNAL, H5E_MPI, FAIL, "Can't retrieve MPI communicator") - - /* Get the MPI rank */ - if((mpi_rank = H5F_mpi_get_rank(dset->oloc.file)) < 0) - HGOTO_ERROR(H5E_INTERNAL, H5E_MPI, FAIL, "Can't retrieve MPI rank") - /* Set the MPI-capable file driver flag */ using_mpi = TRUE; - /* Use the internal "independent" DXPL */ - data_dxpl_id = H5AC_ind_dxpl_id; + /* init chunk info stuff for collective I/O */ + chunk_info.num_io = 0; + chunk_info.addr = NULL; } /* end if */ - else { -#endif /* H5_HAVE_PARALLEL */ - /* Use the DXPL we were given */ - data_dxpl_id = dxpl_id; -#ifdef H5_HAVE_PARALLEL - } /* end else */ #endif /* H5_HAVE_PARALLEL */ /* Fill the DXPL cache values for later use */ - if(H5D__get_dxpl_cache(data_dxpl_id, &dxpl_cache) < 0) + if(H5D__get_dxpl_cache(dxpl_id, &dxpl_cache) < 0) HGOTO_ERROR(H5E_DATASET, H5E_CANTGET, FAIL, "can't fill dxpl cache") /* Get original chunk size */ @@ -3352,7 +3347,7 @@ H5D__chunk_allocate(const H5D_t *dset, hid_t dxpl_id, hbool_t full_overwrite, if(H5D__fill_init(&fb_info, NULL, (H5MM_allocate_t)H5D__chunk_alloc, (void *)pline, (H5MM_free_t)H5D__chunk_xfree, (void *)pline, &dset->shared->dcpl_cache.fill, dset->shared->type, - dset->shared->type_id, (size_t)0, orig_chunk_size, data_dxpl_id) < 0) + dset->shared->type_id, (size_t)0, orig_chunk_size, dxpl_id) < 0) HGOTO_ERROR(H5E_DATASET, H5E_CANTINIT, FAIL, "can't initialize fill buffer info") fb_info_init = TRUE; @@ -3426,7 +3421,8 @@ H5D__chunk_allocate(const H5D_t *dset, hid_t dxpl_id, hbool_t full_overwrite, } /* end else */ while(!carry) { - size_t chunk_size = orig_chunk_size; /* Size of chunk in bytes, possibly filtered */ + /* Reset size of chunk in bytes, in case filtered size changes */ + chunk_size = orig_chunk_size; #ifndef NDEBUG /* None of the chunks should be allocated */ @@ -3462,6 +3458,9 @@ H5D__chunk_allocate(const H5D_t *dset, hid_t dxpl_id, hbool_t full_overwrite, if(fb_info_init && fb_info.has_vlen_fill_type) { /* Sanity check */ HDassert(should_fill); +#ifdef H5_HAVE_PARALLEL + HDassert(!using_mpi); /* Can't write VL datatypes in parallel currently */ +#endif /* Check to make sure the buffer is large enough. It is * possible (though ill-advised) for the filter to shrink the @@ -3474,7 +3473,7 @@ H5D__chunk_allocate(const H5D_t *dset, hid_t dxpl_id, hbool_t full_overwrite, } /* end if */ /* Fill the buffer with VL datatype fill values */ - if(H5D__fill_refill_vl(&fb_info, fb_info.elmts_per_buf, data_dxpl_id) < 0) + if(H5D__fill_refill_vl(&fb_info, fb_info.elmts_per_buf, dxpl_id) < 0) HGOTO_ERROR(H5E_DATASET, H5E_CANTCONVERT, FAIL, "can't refill fill value buffer") /* Check if there are filters which need to be applied to the chunk */ @@ -3519,18 +3518,26 @@ H5D__chunk_allocate(const H5D_t *dset, hid_t dxpl_id, hbool_t full_overwrite, #ifdef H5_HAVE_PARALLEL /* Check if this file is accessed with an MPI-capable file driver */ if(using_mpi) { - /* Write the chunks out from only one process */ - /* !! Use the internal "independent" DXPL!! -QAK */ - if(H5_PAR_META_WRITE == mpi_rank) - if(H5F_block_write(dset->oloc.file, H5FD_MEM_DRAW, udata.addr, chunk_size, data_dxpl_id, fb_info.fill_buf) < 0) - HGOTO_ERROR(H5E_IO, H5E_WRITEERROR, FAIL, "unable to write raw data to file") + /* collect all chunk addresses to be written to + write collectively at the end */ + /* allocate/resize address array if no more space left */ + if(0 == chunk_info.num_io % 1024) { + if(NULL == (chunk_info.addr = (haddr_t *)HDrealloc + (chunk_info.addr, (chunk_info.num_io + 1024) * sizeof(haddr_t)))) + HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "memory allocation failed for chunk addresses"); + } /* end if */ + + /* Store the chunk's address for later */ + chunk_info.addr[chunk_info.num_io] = udata.addr; + chunk_info.num_io++; - /* Indicate that blocks are being written */ + /* Indicate that blocks will be written */ blocks_written = TRUE; } /* end if */ else { #endif /* H5_HAVE_PARALLEL */ - if(H5F_block_write(dset->oloc.file, H5FD_MEM_DRAW, udata.addr, chunk_size, data_dxpl_id, fb_info.fill_buf) < 0) + if(H5F_block_write(dset->oloc.file, H5FD_MEM_DRAW, udata.addr, chunk_size, + dxpl_id, fb_info.fill_buf) < 0) HGOTO_ERROR(H5E_IO, H5E_WRITEERROR, FAIL, "unable to write raw data to file") #ifdef H5_HAVE_PARALLEL } /* end else */ @@ -3563,15 +3570,10 @@ H5D__chunk_allocate(const H5D_t *dset, hid_t dxpl_id, hbool_t full_overwrite, } /* end for(op_dim=0...) */ #ifdef H5_HAVE_PARALLEL - /* Only need to block at the barrier if we actually initialized a chunk */ - /* using an MPI-capable file driver */ + /* do final collective I/O */ if(using_mpi && blocks_written) { - /* Wait at barrier to avoid race conditions where some processes are - * still writing out chunks and other processes race ahead to read - * them in, getting bogus data. - */ - if(MPI_SUCCESS != (mpi_code = MPI_Barrier(mpi_comm))) - HMPI_GOTO_ERROR(FAIL, "MPI_Barrier failed", mpi_code) + if(H5D__chunk_collective_fill(dset, dxpl_id, &chunk_info, chunk_size, fb_info.fill_buf) < 0) + HGOTO_ERROR(H5E_IO, H5E_WRITEERROR, FAIL, "unable to write raw data to file") } /* end if */ #endif /* H5_HAVE_PARALLEL */ @@ -3583,9 +3585,172 @@ done: if(fb_info_init && H5D__fill_term(&fb_info) < 0) HDONE_ERROR(H5E_DATASET, H5E_CANTFREE, FAIL, "Can't release fill buffer info") +#ifdef H5_HAVE_PARALLEL + if(using_mpi) { + if(chunk_info.addr) + HDfree(chunk_info.addr); + } /* end if */ +#endif + FUNC_LEAVE_NOAPI_TAG(ret_value, FAIL) } /* end H5D__chunk_allocate() */ +#ifdef H5_HAVE_PARALLEL + +/*------------------------------------------------------------------------- + * Function: H5D__chunk_collective_fill + * + * Purpose: Use MPIO collective write to fill the chunks (if number of + * chunks to fill is greater than the number of MPI procs; + * otherwise use independent I/O). + * + * Return: Non-negative on success/Negative on failure + * + * Programmer: Mohamad Chaarawi + * July 30, 2014 + * + *------------------------------------------------------------------------- + */ +static herr_t +H5D__chunk_collective_fill(const H5D_t *dset, hid_t dxpl_id, + H5D_chunk_coll_info_t *chunk_info, size_t chunk_size, const void *fill_buf) +{ + MPI_Comm mpi_comm = MPI_COMM_NULL; /* MPI communicator for file */ + int mpi_rank = (-1); /* This process's rank */ + int mpi_size = (-1); /* MPI Comm size */ + int mpi_code; /* MPI return code */ + size_t num_blocks; /* Number of blocks between processes. */ + size_t leftover_blocks; /* Number of leftover blocks to handle */ + int blocks, leftover, block_len; /* converted to int for MPI */ + MPI_Aint *chunk_disp_array = NULL; + int *block_lens = NULL; + MPI_Datatype mem_type, file_type; + hid_t data_dxpl_id = -1; /* DXPL ID to use for raw data I/O operations */ + int i; /* Local index variable */ + herr_t ret_value = SUCCEED; /* Return value */ + + FUNC_ENTER_STATIC + + /* Get the MPI communicator */ + if(MPI_COMM_NULL == (mpi_comm = H5F_mpi_get_comm(dset->oloc.file))) + HGOTO_ERROR(H5E_INTERNAL, H5E_MPI, FAIL, "Can't retrieve MPI communicator") + + /* Get the MPI rank */ + if((mpi_rank = H5F_mpi_get_rank(dset->oloc.file)) < 0) + HGOTO_ERROR(H5E_INTERNAL, H5E_MPI, FAIL, "Can't retrieve MPI rank") + + /* Get the MPI size */ + if((mpi_size = H5F_mpi_get_size(dset->oloc.file)) < 0) + HGOTO_ERROR(H5E_INTERNAL, H5E_MPI, FAIL, "Can't retrieve MPI size") + + /* Get a copy of the DXPL, to modify */ + if((data_dxpl_id = H5P_copy_plist((H5P_genplist_t *)H5I_object(dxpl_id), TRUE)) < 0) + HGOTO_ERROR(H5E_PLIST, H5E_CANTCOPY, FAIL, "can't copy property list") + + /* Distribute evenly the number of blocks between processes. */ + num_blocks = chunk_info->num_io / mpi_size; /* value should be the same on all procs */ + + /* after evenly distributing the blocks between processes, are + there any leftover blocks for each individual process + (round-robin) */ + leftover_blocks = chunk_info->num_io % mpi_size; + + /* Cast values to types needed by MPI */ + H5_ASSIGN_OVERFLOW(blocks, num_blocks, size_t, int); + H5_ASSIGN_OVERFLOW(leftover, leftover_blocks, size_t, int); + H5_ASSIGN_OVERFLOW(block_len, chunk_size, size_t, int); + + /* Allocate buffers */ + /* (MSC - should not need block_lens if MPI_type_create_hindexed_block is working) */ + if(NULL == (block_lens = (int *)H5MM_malloc((blocks + 1) * sizeof(int)))) + HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate chunk lengths buffer") + if(NULL == (chunk_disp_array = (MPI_Aint *)H5MM_malloc((blocks + 1) * sizeof(MPI_Aint)))) + HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate chunk file displacement buffer") + + for(i = 0 ; i < blocks ; i++) { + /* store the chunk address as an MPI_Aint */ + chunk_disp_array[i] = (MPI_Aint)(chunk_info->addr[i + mpi_rank*blocks]); + + /* MSC - should not need this if MPI_type_create_hindexed_block is working */ + block_lens[i] = block_len; + + /* make sure that the addresses in the datatype are + monotonically non decreasing */ + if(i) + HDassert(chunk_disp_array[i] > chunk_disp_array[i - 1]); + } /* end if */ + + /* calculate if there are any leftover blocks after evenly + distributing. If there are, then round robin the distribution + to processes 0 -> leftover. */ + if(leftover && leftover > mpi_rank) { + chunk_disp_array[blocks] = (MPI_Aint)chunk_info->addr[blocks*mpi_size + mpi_rank]; + block_lens[blocks] = block_len; + blocks++; + } + + /* MSC - should use this if MPI_type_create_hindexed block is working */ + //mpi_code = MPI_Type_create_hindexed_block(blocks, block_len, chunk_disp_array, + //MPI_BYTE, &file_type); + mpi_code = MPI_Type_create_hindexed(blocks, block_lens, chunk_disp_array, + MPI_BYTE, &file_type); + if(mpi_code != MPI_SUCCESS) + HMPI_GOTO_ERROR(FAIL, "MPI_Type_create_hindexed failed", mpi_code) + if(MPI_SUCCESS != (mpi_code = MPI_Type_commit(&file_type))) + HMPI_GOTO_ERROR(FAIL, "MPI_Type_commit failed", mpi_code) + + mpi_code = MPI_Type_create_hvector(blocks, block_len, 0, MPI_BYTE, &mem_type); + if(mpi_code != MPI_SUCCESS) + HMPI_GOTO_ERROR(FAIL, "MPI_Type_create_hvector failed", mpi_code) + if(MPI_SUCCESS != (mpi_code = MPI_Type_commit(&mem_type))) + HMPI_GOTO_ERROR(FAIL, "MPI_Type_commit failed", mpi_code) + + /* set MPI-IO VFD properties */ + { + H5FD_mpio_xfer_t xfer_mode = H5FD_MPIO_COLLECTIVE; + H5P_genplist_t *plist; /* Property list pointer */ + + if(NULL == (plist = H5P_object_verify(data_dxpl_id, H5P_DATASET_XFER))) + HGOTO_ERROR(H5E_PLIST, H5E_BADTYPE, FAIL, "not a dataset transfer list") + + /* Set buffer MPI type */ + if(H5P_set(plist, H5FD_MPI_XFER_MEM_MPI_TYPE_NAME, &mem_type) < 0) + HGOTO_ERROR(H5E_PLIST, H5E_CANTSET, FAIL, "can't set MPI-I/O property") + + /* Set File MPI type */ + if(H5P_set(plist, H5FD_MPI_XFER_FILE_MPI_TYPE_NAME, &file_type) < 0) + HGOTO_ERROR(H5E_PLIST, H5E_CANTSET, FAIL, "can't set MPI-I/O property") + + /* set transfer mode */ + if(H5P_set(plist, H5D_XFER_IO_XFER_MODE_NAME, &xfer_mode) < 0) + HGOTO_ERROR(H5E_PLIST, H5E_CANTSET, FAIL, "can't set transfer mode") + } + + /* low level write */ + if(H5F_block_write(dset->oloc.file, H5FD_MEM_DRAW, (haddr_t)0, (blocks) ? (size_t)1 : (size_t)0, + data_dxpl_id, fill_buf) < 0) + HGOTO_ERROR(H5E_IO, H5E_WRITEERROR, FAIL, "unable to write raw data to file") + + /* Barrier so processes don't race ahead */ + if(MPI_SUCCESS != (mpi_code = MPI_Barrier(mpi_comm))) + HMPI_GOTO_ERROR(FAIL, "MPI_Barrier failed", mpi_code) + +done: + if(data_dxpl_id > 0 && H5I_dec_ref(data_dxpl_id) < 0) + HDONE_ERROR(H5E_DATASET, H5E_CANTFREE, FAIL, "Can't free property list") + + /* free things */ + if(MPI_SUCCESS != (mpi_code = MPI_Type_free(&file_type))) + HMPI_DONE_ERROR(FAIL, "MPI_Type_free failed", mpi_code) + if(MPI_SUCCESS != (mpi_code = MPI_Type_free(&mem_type))) + HMPI_DONE_ERROR(FAIL, "MPI_Type_free failed", mpi_code) + H5MM_xfree(chunk_disp_array); + H5MM_xfree(block_lens); + + FUNC_LEAVE_NOAPI(ret_value) +} /* end H5D__chunk_collective_fill() */ +#endif /* H5_HAVE_PARALLEL */ + /*------------------------------------------------------------------------- * Function: H5D__chunk_prune_fill |