summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJordan Henderson <jhenderson@hdfgroup.org>2017-01-17 20:39:41 (GMT)
committerJordan Henderson <jhenderson@hdfgroup.org>2017-01-17 20:39:41 (GMT)
commit349b3634f73954eb17fd8f2ee730e43606959467 (patch)
treebc9be36ff1234be8344678dcda89c16cb8f582d6
parent2292a8520df0738826e22e2e097495761f3e5d28 (diff)
downloadhdf5-349b3634f73954eb17fd8f2ee730e43606959467.zip
hdf5-349b3634f73954eb17fd8f2ee730e43606959467.tar.gz
hdf5-349b3634f73954eb17fd8f2ee730e43606959467.tar.bz2
Add chunk redistribution for shared chunks
Remove check for process having a selection as this needs to be reworked
-rw-r--r--src/H5Dmpio.c438
1 files changed, 248 insertions, 190 deletions
diff --git a/src/H5Dmpio.c b/src/H5Dmpio.c
index 6ae4ef8..49f25ae 100644
--- a/src/H5Dmpio.c
+++ b/src/H5Dmpio.c
@@ -105,7 +105,9 @@ typedef struct H5D_filtered_collective_io_info_t {
H5D_chunk_info_t chunk_info;
H5F_block_t old_chunk;
H5F_block_t new_chunk;
- int num_writers;
+ size_t io_size;
+ size_t num_writers;
+ int owner;
void *buf;
} H5D_filtered_collective_io_info_t;
@@ -424,7 +426,7 @@ done:
H5MM_free(displacements_array);
FUNC_LEAVE_NOAPI(ret_value)
-} /* end H5D__filtered_collective_io_info_arraygather() */
+} /* end H5D__mpio_array_gather() */
/*-------------------------------------------------------------------------
@@ -1302,8 +1304,8 @@ H5D__link_chunk_filtered_collective_io(H5D_io_info_t *io_info, const H5D_type_in
H5D_chk_idx_info_t index_info;
H5S_sel_iter_t *mem_iter = NULL;
H5D_storage_t ctg_store;
- MPI_Datatype mem_type;
- MPI_Datatype file_type;
+ MPI_Datatype mem_type = MPI_BYTE;
+ MPI_Datatype file_type = MPI_BYTE;
hbool_t mem_type_is_derived = FALSE;
hbool_t file_type_is_derived = FALSE;
hbool_t mem_iter_init = FALSE;
@@ -1352,228 +1354,208 @@ H5D__link_chunk_filtered_collective_io(H5D_io_info_t *io_info, const H5D_type_in
* still have to do the re-allocation in the file. Get rid of else case
* and instead change mpi_buf_count to 0 if they have no selection
*/
- if (H5SL_count(fm->sel_chunks)) {
#ifdef PARALLEL_COMPRESS_DEBUG
- HDfprintf(debug_file, "Incoming messages from other processes:\n");
- HDfprintf(debug_file, "-----------------------------------------\n");
- for (size_t j = 0; j < chunk_list_num_entries; j++) {
- HDfprintf(debug_file, "| Owner of chunk at address %a is expecting messages from %d other processes.\n",
- chunk_list[j].old_chunk.offset, chunk_list[j].num_writers);
- }
- HDfprintf(debug_file, "-----------------------------------------\n\n");
+ HDfprintf(debug_file, "Incoming messages from other processes:\n");
+ HDfprintf(debug_file, "-----------------------------------------\n");
+ for (size_t j = 0; j < chunk_list_num_entries; j++) {
+ HDfprintf(debug_file, "| Owner of chunk at address %a is expecting messages from %d other processes.\n",
+ chunk_list[j].old_chunk.offset, chunk_list[j].num_writers - 1);
+ }
+ HDfprintf(debug_file, "-----------------------------------------\n\n");
#endif
- if (NULL == (mem_iter = (H5S_sel_iter_t *) H5MM_malloc(sizeof(*mem_iter))))
- HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate memory iterator")
+ if (NULL == (mem_iter = (H5S_sel_iter_t *) H5MM_malloc(sizeof(*mem_iter))))
+ HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate memory iterator")
- if (io_info->op_type == H5D_IO_OP_WRITE) { /* Filtered collective write */
+ if (io_info->op_type == H5D_IO_OP_WRITE) { /* Filtered collective write */
#ifdef PARALLEL_COMPRESS_DEBUG
- HDfprintf(debug_file, "Processing chunks:\n");
- HDfprintf(debug_file, "---------------------------------------------------\n");
+ HDfprintf(debug_file, "Processing chunks:\n");
+ HDfprintf(debug_file, "---------------------------------------------------\n");
#endif
- /* Iterate through all the chunks in the collective write operation,
- * updating each chunk with the data modifications from other processes,
- * then re-filtering the chunk. */
- for (i = 0; i < chunk_list_num_entries; i++) {
- unsigned filter_mask = 0;
- hbool_t full_overwrite = TRUE;
- size_t buf_size;
- hssize_t iter_nelmts;
+ /* Iterate through all the chunks in the collective write operation,
+ * updating each chunk with the data modifications from other processes,
+ * then re-filtering the chunk. */
+ for (i = 0; i < chunk_list_num_entries; i++) {
+ unsigned filter_mask = 0;
+ hbool_t full_overwrite = TRUE;
+ size_t buf_size;
+ hssize_t iter_nelmts;
#ifdef PARALLEL_COMPRESS_DEBUG
- HDfprintf(debug_file, "| Chunk at address %a:\n", chunk_list[i].old_chunk.offset);
+ HDfprintf(debug_file, "| Chunk at address %a:\n", chunk_list[i].old_chunk.offset);
#endif
- /* If this is a full overwrite of this chunk, enough memory must be allocated for
- * the size of the unfiltered chunk. Otherwise, enough memory must be allocated
- * to read the filtered chunk from the file. */
- /* XXX: Return value of macro should be checked instead */
- buf_size = (full_overwrite) ? (hsize_t) H5S_GET_EXTENT_NPOINTS(chunk_list[i].chunk_info.fspace) * type_info->src_type_size
- : chunk_list[i].old_chunk.length;
- chunk_list[i].new_chunk.length = buf_size;
+ /* XXX: Determine if a chunk is being fully overwritten by looking at the total selection
+ * in the dataspace */
+
+ /* If this is a full overwrite of this chunk, enough memory must be allocated for
+ * the size of the unfiltered chunk. Otherwise, enough memory must be allocated
+ * to read the filtered chunk from the file. */
+ /* XXX: Return value of macro should be checked instead */
+ buf_size = (full_overwrite) ? (hsize_t) H5S_GET_EXTENT_NPOINTS(chunk_list[i].chunk_info.fspace) * type_info->src_type_size
+ : chunk_list[i].old_chunk.length;
+ chunk_list[i].new_chunk.length = buf_size;
#ifdef PARALLEL_COMPRESS_DEBUG
- HDfprintf(debug_file, "| - Allocing %zd bytes for %s chunk overwrite buffer.\n", buf_size,
- full_overwrite ? "full" : "non-full");
+ HDfprintf(debug_file, "| - Allocing %zd bytes for %s chunk overwrite buffer.\n", buf_size,
+ full_overwrite ? "full" : "non-full");
#endif
- if (NULL == (chunk_list[i].buf = H5MM_malloc(buf_size)))
- HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate chunk modification buffer")
-
- /* Initialize iterator for memory selection */
- if (H5S_select_iter_init(mem_iter, chunk_list[i].chunk_info.mspace, type_info->dst_type_size) < 0)
- HGOTO_ERROR(H5E_DATASET, H5E_CANTINIT, FAIL, "unable to initialize memory selection information")
- mem_iter_init = TRUE;
+ if (NULL == (chunk_list[i].buf = H5MM_malloc(buf_size)))
+ HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate chunk modification buffer")
- /* Owner of this chunk, receive modification data from other processes */
+ /* Owner of this chunk, receive modification data from other processes */
- if (!full_overwrite) {
- /* Read the chunk from the file */
- if (H5F_block_read(io_info->dset->oloc.file, H5FD_MEM_DRAW, chunk_list[i].old_chunk.offset,
- buf_size, H5AC_rawdata_dxpl_id, chunk_list[i].buf) < 0)
- HGOTO_ERROR(H5E_IO, H5E_READERROR, H5_ITER_ERROR, "unable to read raw data chunk")
+ if (!full_overwrite) {
+ /* Read the chunk from the file */
+ if (H5F_block_read(io_info->dset->oloc.file, H5FD_MEM_DRAW, chunk_list[i].old_chunk.offset,
+ buf_size, H5AC_rawdata_dxpl_id, chunk_list[i].buf) < 0)
+ HGOTO_ERROR(H5E_IO, H5E_READERROR, H5_ITER_ERROR, "unable to read raw data chunk")
- /* Unfilter the chunk before modifying it */
- if (H5Z_pipeline(&io_info->dset->shared->dcpl_cache.pline, H5Z_FLAG_REVERSE, &filter_mask,
- io_info->dxpl_cache->err_detect, io_info->dxpl_cache->filter_cb,
- (size_t *) &chunk_list[i].new_chunk.length, &buf_size, &chunk_list[i].buf) < 0)
- HGOTO_ERROR(H5E_PLINE, H5E_CANTFILTER, H5_ITER_ERROR, "couldn't unfilter chunk for modifying")
+ /* Unfilter the chunk before modifying it */
+ if (H5Z_pipeline(&io_info->dset->shared->dcpl_cache.pline, H5Z_FLAG_REVERSE, &filter_mask,
+ io_info->dxpl_cache->err_detect, io_info->dxpl_cache->filter_cb,
+ (size_t *) &chunk_list[i].new_chunk.length, &buf_size, &chunk_list[i].buf) < 0)
+ HGOTO_ERROR(H5E_PLINE, H5E_CANTFILTER, H5_ITER_ERROR, "couldn't unfilter chunk for modifying")
#ifdef PARALLEL_COMPRESS_DEBUG
- HDfprintf(debug_file, "| - After decompression: Nbytes=%zd; Buf_size=%zd.\n", chunk_list[i].new_chunk.length, buf_size);
+ HDfprintf(debug_file, "| - After decompression: Nbytes=%zd; Buf_size=%zd.\n", chunk_list[i].new_chunk.length, buf_size);
- HDfprintf(debug_file, "| - Read buf:\n| - [");
- for (size_t j = 0; j < chunk_list[i].new_chunk.length / type_info->src_type_size; j++) {
- if (j > 0) HDfprintf(debug_file, ", ");
- HDfprintf(debug_file, "%lld", ((long *) chunk_list[i].buf)[j]);
- }
- HDfprintf(debug_file, "]\n|\n");
+ HDfprintf(debug_file, "| - Read buf:\n| - [");
+ for (size_t j = 0; j < chunk_list[i].new_chunk.length / type_info->src_type_size; j++) {
+ if (j > 0) HDfprintf(debug_file, ", ");
+ HDfprintf(debug_file, "%lld", ((long *) chunk_list[i].buf)[j]);
+ }
+ HDfprintf(debug_file, "]\n|\n");
#endif
- } /* end if */
+ } /* end if */
- /* Update the chunk data with the modifications from the current (owning) process */
- if ((iter_nelmts = H5S_GET_SELECT_NPOINTS(chunk_list[i].chunk_info.mspace)) < 0)
- HGOTO_ERROR(H5E_DATASET, H5E_CANTCOUNT, FAIL, "dataspace is invalid")
+ /* Update the chunk data with the modifications from the current (owning) process */
+
+ /* Initialize iterator for memory selection */
+ if (H5S_select_iter_init(mem_iter, chunk_list[i].chunk_info.mspace, type_info->dst_type_size) < 0)
+ HGOTO_ERROR(H5E_DATASET, H5E_CANTINIT, FAIL, "unable to initialize memory selection information")
+ mem_iter_init = TRUE;
- if (!H5D__gather_mem(io_info->u.wbuf, chunk_list[i].chunk_info.mspace, mem_iter,
- (size_t) iter_nelmts, io_info->dxpl_cache, chunk_list[i].buf))
- HGOTO_ERROR(H5E_DATASET, H5E_CANTRECV, FAIL, "couldn't gather from write buffer")
+ if ((iter_nelmts = H5S_GET_SELECT_NPOINTS(chunk_list[i].chunk_info.mspace)) < 0)
+ HGOTO_ERROR(H5E_DATASET, H5E_CANTCOUNT, FAIL, "dataspace is invalid")
+
+ if (!H5D__gather_mem(io_info->u.wbuf, chunk_list[i].chunk_info.mspace, mem_iter,
+ (size_t) iter_nelmts, io_info->dxpl_cache, chunk_list[i].buf))
+ HGOTO_ERROR(H5E_DATASET, H5E_CANTRECV, FAIL, "couldn't gather from write buffer")
- /* Update the chunk data with any modifications from other processes */
+ /* Update the chunk data with any modifications from other processes */
#ifdef PARALLEL_COMPRESS_DEBUG
- HDfprintf(debug_file, "| - Write Buffer:\n");
- HDfprintf(debug_file, "| - [");
- for (size_t j = 0; j < chunk_list[i].new_chunk.length / type_info->src_type_size; j++) {
- if (j > 0) HDfprintf(debug_file, ", ");
- HDfprintf(debug_file, "%lld", ((long *) chunk_list[i].buf)[j]);
- }
- HDfprintf(debug_file, "]\n|\n");
+ HDfprintf(debug_file, "| - Write Buffer:\n");
+ HDfprintf(debug_file, "| - [");
+ for (size_t j = 0; j < chunk_list[i].new_chunk.length / type_info->src_type_size; j++) {
+ if (j > 0) HDfprintf(debug_file, ", ");
+ HDfprintf(debug_file, "%lld", ((long *) chunk_list[i].buf)[j]);
+ }
+ HDfprintf(debug_file, "]\n|\n");
- HDfprintf(debug_file, "| - Write buf is at address %a.\n|\n", chunk_list[i].buf);
+ HDfprintf(debug_file, "| - Write buf is at address %a.\n|\n", chunk_list[i].buf);
- HDfprintf(debug_file, "| - About to filter %zd bytes in buffer of size %zd.\n|\n", chunk_list[i].new_chunk.length, buf_size);
+ HDfprintf(debug_file, "| - About to filter %zd bytes in buffer of size %zd.\n|\n", chunk_list[i].new_chunk.length, buf_size);
#endif
- /* Filter the chunk */
- if (H5Z_pipeline(&io_info->dset->shared->dcpl_cache.pline, 0, &filter_mask,
- io_info->dxpl_cache->err_detect, io_info->dxpl_cache->filter_cb,
- (size_t *) &chunk_list[i].new_chunk.length, &buf_size, &chunk_list[i].buf) < 0)
- HGOTO_ERROR(H5E_PLINE, H5E_CANTFILTER, H5_ITER_ERROR, "output pipeline failed")
+ /* Filter the chunk */
+ if (H5Z_pipeline(&io_info->dset->shared->dcpl_cache.pline, 0, &filter_mask,
+ io_info->dxpl_cache->err_detect, io_info->dxpl_cache->filter_cb,
+ (size_t *) &chunk_list[i].new_chunk.length, &buf_size, &chunk_list[i].buf) < 0)
+ HGOTO_ERROR(H5E_PLINE, H5E_CANTFILTER, H5_ITER_ERROR, "output pipeline failed")
#if H5_SIZEOF_SIZE_T > 4
- /* Check for the chunk expanding too much to encode in a 32-bit value */
- if (chunk_list[i].new_chunk.length > ((size_t) 0xffffffff))
- HGOTO_ERROR(H5E_DATASET, H5E_BADRANGE, FAIL, "chunk too large for 32-bit length")
+ /* Check for the chunk expanding too much to encode in a 32-bit value */
+ if (chunk_list[i].new_chunk.length > ((size_t) 0xffffffff))
+ HGOTO_ERROR(H5E_DATASET, H5E_BADRANGE, FAIL, "chunk too large for 32-bit length")
#endif
- if (mem_iter_init && H5S_SELECT_ITER_RELEASE(mem_iter) < 0)
- HGOTO_ERROR(H5E_DATASET, H5E_CANTFREE, FAIL, "couldn't release selection iterator")
- } /* end for */
+ if (mem_iter_init && H5S_SELECT_ITER_RELEASE(mem_iter) < 0)
+ HGOTO_ERROR(H5E_DATASET, H5E_CANTFREE, FAIL, "couldn't release selection iterator")
+ } /* end for */
#ifdef PARALLEL_COMPRESS_DEBUG
- HDfprintf(debug_file, "---------------------------------------------------\n\n");
+ HDfprintf(debug_file, "---------------------------------------------------\n\n");
#endif
- /* Gather the new chunk sizes to all processes for a collective reallocation
- * of the chunks in the file */
- if (H5D__mpio_array_gather(io_info, chunk_list, chunk_list_num_entries, sizeof(*chunk_list),
- (void **) &total_array, &total_array_num_entries, NULL) < 0)
- HGOTO_ERROR(H5E_DATASET, H5E_CANTRECV, FAIL, "couldn't gather new chunk sizes")
+ /* Gather the new chunk sizes to all processes for a collective reallocation
+ * of the chunks in the file */
+ if (H5D__mpio_array_gather(io_info, chunk_list, chunk_list_num_entries, sizeof(*chunk_list),
+ (void **) &total_array, &total_array_num_entries, NULL) < 0)
+ HGOTO_ERROR(H5E_DATASET, H5E_CANTRECV, FAIL, "couldn't gather new chunk sizes")
#ifdef PARALLEL_COMPRESS_DEBUG
- HDfprintf(debug_file, "Reallocing chunks:\n");
- HDfprintf(debug_file, "------------------------------\n");
+ HDfprintf(debug_file, "Reallocing chunks:\n");
+ HDfprintf(debug_file, "------------------------------\n");
#endif
- /* Collectively re-allocate the modified chunks (from each process) in the file */
- for (i = 0; i < total_array_num_entries; i++) {
- hbool_t insert;
+ /* Collectively re-allocate the modified chunks (from each process) in the file */
+ for (i = 0; i < total_array_num_entries; i++) {
+ hbool_t insert;
#ifdef PARALLEL_COMPRESS_DEBUG
- HDfprintf(debug_file, "| Reallocing chunk at address %a with new length of %zd.\n", total_array[i].new_chunk.offset, total_array[i].new_chunk.length);
+ HDfprintf(debug_file, "| Reallocing chunk at address %a with new length of %zd.\n", total_array[i].new_chunk.offset, total_array[i].new_chunk.length);
#endif
- if (H5D__chunk_file_alloc(&index_info, &total_array[i].old_chunk, &total_array[i].new_chunk,
- &insert, total_array[i].chunk_info.scaled) < 0)
- HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "unable to allocate chunk")
+ if (H5D__chunk_file_alloc(&index_info, &total_array[i].old_chunk, &total_array[i].new_chunk,
+ &insert, total_array[i].chunk_info.scaled) < 0)
+ HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "unable to allocate chunk")
#ifdef PARALLEL_COMPRESS_DEBUG
- HDfprintf(debug_file, "| - Chunk now at address %a.\n|\n", total_array[i].new_chunk);
+ HDfprintf(debug_file, "| - Chunk now at address %a.\n|\n", total_array[i].new_chunk);
#endif
- } /* end for */
+ } /* end for */
#ifdef PARALLEL_COMPRESS_DEBUG
- HDfprintf(debug_file, "------------------------------\n\n");
-#endif
+ HDfprintf(debug_file, "------------------------------\n\n");
+#endif
+
+ /* XXX: During the collective re-allocation of chunks in the file, the record for each
+ * chunk is only update in the total array, not in the local copy of chunks on each
+ * process. However, each process needs the updated chunk records so that they can create
+ * a MPI type for the collective write that will write to the chunk's new locations instead
+ * of the old ones. This ugly hack seems to be the best solution to copy the information
+ * back to the local array and avoid having to modify the collective write type function
+ * in an ugly way so that it will accept the total array instead of the local array.
+ * This works correctly because the array gather function guarantees that the chunk
+ * data in the total array is ordered in blocks by rank.
+ */
+ {
+ size_t offset;
- /* XXX: During the collective re-allocation of chunks in the file, the record for each
- * chunk is only update in the total array, not in the local copy of chunks on each
- * process. However, each process needs the updated chunk records so that they can create
- * a MPI type for the collective write that will write to the chunk's new locations instead
- * of the old ones. This ugly hack seems to be the best solution to copy the information
- * back to the local array and avoid having to modify the collective write type function
- * in an ugly way so that it will accept the total array instead of the local array.
- * This works correctly because the array gather function guarantees that the chunk
- * data in the total array is ordered in blocks by rank.
- */
- {
- size_t offset;
+ /* XXX: No need to use bytes here, should be able to simply find offset in
+ * terms of H5D_filtered_collective_io_info_t's */
- offset = 0;
- for (i = 0; i < (size_t) mpi_rank; i++)
- offset += num_chunks_selected_array[i] * sizeof(H5D_filtered_collective_io_info_t);
+ offset = 0;
+ for (i = 0; i < (size_t) mpi_rank; i++)
+ offset += num_chunks_selected_array[i] * sizeof(H5D_filtered_collective_io_info_t);
- HDmemcpy(chunk_list, &((char *) total_array)[offset], num_chunks_selected_array[mpi_rank] * sizeof(H5D_filtered_collective_io_info_t));
- }
+ HDmemcpy(chunk_list, &((char *) total_array)[offset], num_chunks_selected_array[mpi_rank] * sizeof(H5D_filtered_collective_io_info_t));
+ }
- /* Create single MPI type encompassing each selection in the dataspace */
- if (H5D__mpio_filtered_collective_write_type(chunk_list, chunk_list_num_entries,
- &mem_type, &mem_type_is_derived, &file_type, &file_type_is_derived) < 0)
- HGOTO_ERROR(H5E_DATASET, H5E_CANTRECV, FAIL, "couldn't create MPI link chunk I/O type")
+ /* Create single MPI type encompassing each selection in the dataspace */
+ if (H5D__mpio_filtered_collective_write_type(chunk_list, chunk_list_num_entries,
+ &mem_type, &mem_type_is_derived, &file_type, &file_type_is_derived) < 0)
+ HGOTO_ERROR(H5E_DATASET, H5E_CANTRECV, FAIL, "couldn't create MPI link chunk I/O type")
- /* Override the write buffer to point to the address of the first
- * chunk data buffer */
+ /* Override the write buffer to point to the address of the first
+ * chunk data buffer */
+ /* XXX: Find a better solution, but processes with no chunks on them
+ * are a special case so they have a NULL buf */
+ if (mem_type_is_derived && file_type_is_derived)
io_info->u.wbuf = chunk_list[0].buf;
- } /* end if */
- else { /* Filtered collective read */
-
- } /* end else */
-
- /* We have a single, complicated MPI datatype for both memory & file */
- mpi_buf_count = (hsize_t) 1;
- }
- else { /* No selection at all for this process, contribute none type */
- size_t dataset_num_chunks;
-
- /* Retrieve total # of chunks in dataset */
- H5_CHECKED_ASSIGN(dataset_num_chunks, size_t, fm->layout->u.chunk.nchunks, hsize_t);
-
- /* Allocate chunking information */
- if (NULL == (total_chunk_addr_array = (haddr_t *) H5MM_malloc(sizeof(haddr_t) * dataset_num_chunks)))
- HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate total chunk address arraybuffer")
-
- /* Retrieve chunk address map */
- if (H5D__chunk_addrmap(io_info, total_chunk_addr_array) < 0)
- HGOTO_ERROR(H5E_DATASET, H5E_CANTGET, FAIL, "can't get chunk address")
-
- /* Get chunk with lowest address */
- ctg_store.contig.dset_addr = HADDR_MAX;
- for (i = 0; i < dataset_num_chunks; i++)
- if (total_chunk_addr_array[i] < ctg_store.contig.dset_addr)
- ctg_store.contig.dset_addr = total_chunk_addr_array[i];
- HDassert(ctg_store.contig.dset_addr != HADDR_MAX);
-
- /* Set the MPI datatype */
- file_type = MPI_BYTE;
- mem_type = MPI_BYTE;
+ } /* end if */
+ else { /* Filtered collective read */
- /* No chunks selected for this process */
- mpi_buf_count = (hsize_t) 0;
} /* end else */
+ /* We have a single, complicated MPI datatype for both memory & file */
+ mpi_buf_count = (mem_type_is_derived && file_type_is_derived) ? (hsize_t) 1 : (hsize_t) 0;
+
/* Set up the base storage address for this operation */
ctg_store.contig.dset_addr = 0;
io_info->store = &ctg_store;
@@ -2005,7 +1987,7 @@ if(H5DEBUG(D))
HDfprintf(debug_file, "-----------------------------------------\n");
for (size_t k = 0; k < chunk_list_num_entries; k++) {
HDfprintf(debug_file, "| Owner of chunk at address %a is expecting messages from %d other processes.\n",
- chunk_list[k].old_chunk.offset, chunk_list[k].num_writers);
+ chunk_list[k].old_chunk.offset, chunk_list[k].num_writers - 1);
}
HDfprintf(debug_file, "-----------------------------------------\n\n");
@@ -2013,6 +1995,8 @@ if(H5DEBUG(D))
HDfprintf(debug_file, "---------------------------------------------------\n");
#endif
+ /* XXX: Iteration should be for the max number among processes, since a process could
+ * have no chunks assigned to it */
for (i = 0; i < chunk_list_num_entries; i++) {
unsigned filter_mask = 0;
hbool_t full_overwrite = TRUE;
@@ -2042,11 +2026,6 @@ if(H5DEBUG(D))
if (NULL == (chunk_list[i].buf = H5MM_malloc(buf_size)))
HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate chunk modification buffer")
- /* Initialize iterator for memory selection */
- if (H5S_select_iter_init(mem_iter, chunk_list[i].chunk_info.mspace, type_info->dst_type_size) < 0)
- HGOTO_ERROR(H5E_DATASET, H5E_CANTINIT, FAIL, "unable to initialize memory selection information")
- mem_iter_init = TRUE;
-
/* Owner of this chunk, receive modification data from other processes */
if (!full_overwrite) {
@@ -2074,6 +2053,12 @@ if(H5DEBUG(D))
} /* end if */
/* Update the chunk data with the modifications from the current (owning) process */
+
+ /* Initialize iterator for memory selection */
+ if (H5S_select_iter_init(mem_iter, chunk_list[i].chunk_info.mspace, type_info->dst_type_size) < 0)
+ HGOTO_ERROR(H5E_DATASET, H5E_CANTINIT, FAIL, "unable to initialize memory selection information")
+ mem_iter_init = TRUE;
+
if ((iter_nelmts = H5S_GET_SELECT_NPOINTS(chunk_list[i].chunk_info.mspace)) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_CANTCOUNT, FAIL, "dataspace is invalid")
@@ -2981,9 +2966,11 @@ H5D__construct_filtered_io_info_list(const H5D_io_info_t *io_info, const H5D_typ
size_t **_num_chunks_selected_array)
{
H5D_filtered_collective_io_info_t *local_info_array = NULL;
+ H5D_filtered_collective_io_info_t *overlap_array = NULL;
H5SL_node_t *chunk_node;
hbool_t no_overlap = FALSE;
size_t num_chunks_selected;
+ size_t overlap_array_num_entries;
size_t *num_chunks_selected_array = NULL;
int mpi_rank, mpi_size, mpi_code;
herr_t ret_value = SUCCEED;
@@ -3003,17 +2990,11 @@ H5D__construct_filtered_io_info_list(const H5D_io_info_t *io_info, const H5D_typ
/* Get the no overlap property */
- /* Redistribute chunks to new owners as necessary */
- if (!no_overlap) {
-
- }
-
if (NULL == (local_info_array = (H5D_filtered_collective_io_info_t *) H5MM_malloc(H5SL_count(fm->sel_chunks) * sizeof(*local_info_array))))
HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate local io info array buffer")
- num_chunks_selected = 0;
chunk_node = H5SL_first(fm->sel_chunks);
- while (chunk_node) {
+ for (num_chunks_selected = 0; chunk_node; num_chunks_selected++) {
H5D_chunk_info_t *chunk_info = (H5D_chunk_info_t *) H5SL_item(chunk_node);
H5D_chunk_ud_t udata;
@@ -3021,17 +3002,15 @@ H5D__construct_filtered_io_info_list(const H5D_io_info_t *io_info, const H5D_typ
if (H5D__chunk_lookup(io_info->dset, io_info->md_dxpl_id, chunk_info->scaled, &udata) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_CANTGET, FAIL, "error looking up chunk address")
-#ifdef PARALLEL_COMPRESS_DEBUG
- local_info_array[num_chunks_selected].num_writers = 0;
-#endif
-
- local_info_array[num_chunks_selected].old_chunk = local_info_array[num_chunks_selected].new_chunk = udata.chunk_block;
local_info_array[num_chunks_selected].chunk_info = *chunk_info;
+ local_info_array[num_chunks_selected].old_chunk = local_info_array[num_chunks_selected].new_chunk = udata.chunk_block;
+ local_info_array[num_chunks_selected].io_size = H5S_GET_SELECT_NPOINTS(chunk_info->mspace) * type_info->src_type_size;
+ local_info_array[num_chunks_selected].num_writers = 0;
+ local_info_array[num_chunks_selected].owner = mpi_rank;
local_info_array[num_chunks_selected].buf = NULL;
- num_chunks_selected++;
chunk_node = H5SL_next(chunk_node);
- } /* end while */
+ } /* end for */
#ifdef PARALLEL_COMPRESS_DEBUG
HDfprintf(debug_file, " Contents of local info array\n");
@@ -3069,6 +3048,85 @@ H5D__construct_filtered_io_info_list(const H5D_io_info_t *io_info, const H5D_typ
HDfprintf(debug_file, "-----------------------------------\n\n");
#endif
+ /* Redistribute chunks to new owners as necessary */
+ if (!no_overlap && (io_info->op_type == H5D_IO_OP_WRITE)) {
+ size_t i;
+
+ if (H5D__mpio_array_gather(io_info, local_info_array, num_chunks_selected,
+ sizeof(*local_info_array), &overlap_array, &overlap_array_num_entries,
+ H5D__cmp_filtered_collective_io_entry) < 0)
+ HGOTO_ERROR(H5E_DATASET, H5E_CANTRECV, FAIL, "couldn't ")
+
+ for (i = 0, num_chunks_selected = 0; i < overlap_array_num_entries;) {
+ H5D_filtered_collective_io_info_t chunk_entry;
+ haddr_t chunk_addr = overlap_array[i].old_chunk.offset;
+ size_t num_writers = 0;
+ size_t max_bytes = 0;
+ int new_owner = 0;
+
+ /* Process duplicate entries caused by another process writing
+ * to the same chunk */
+ do {
+ /* Store the correct chunk entry information in case this process
+ * become's the new chunk's owner */
+ if (mpi_rank == overlap_array[i].owner)
+ chunk_entry = overlap_array[i];
+
+ /* New owner of the chunk is determined by the process
+ * which is writing the most data to the chunk */
+ if (overlap_array[i].io_size > max_bytes) {
+ max_bytes = overlap_array[i].io_size;
+ new_owner = overlap_array[i].owner;
+ }
+
+ num_writers++;
+ i++;
+
+ if (i == overlap_array_num_entries) break;
+ } while (overlap_array[i].old_chunk.offset == chunk_addr);
+
+ if (mpi_rank == new_owner) {
+ /* Make sure the new owner will know how many other processes will
+ * be sending chunk modification data to it */
+ chunk_entry.num_writers = num_writers;
+
+ /* New owner takes possession of the chunk */
+ overlap_array[num_chunks_selected++] = chunk_entry;
+ }
+ else {
+ /* Send modification data to new owner */
+
+ }
+
+#ifdef PARALLEL_COMPRESS_DEBUG
+ HDfprintf(debug_file, "Chunk at address %a re-assigned to process %d.\n", chunk_addr, new_owner);
+#endif
+ }
+
+ /* Release old list */
+ if (local_info_array)
+ H5MM_free(local_info_array);
+
+ /* Local info list becomes modified (redistributed) chunk list */
+ local_info_array = overlap_array;
+
+#ifdef PARALLEL_COMPRESS_DEBUG
+ HDfprintf(debug_file, "This process now has %d chunks selected after redistribution.\n\n", num_chunks_selected);
+
+ HDfprintf(debug_file, " Contents of local info array (after redistribution)\n");
+ HDfprintf(debug_file, "------------------------------\n");
+ for (size_t j = 0; j < (size_t) num_chunks_selected; j++) {
+ HDfprintf(debug_file, "| Chunk Entry %zd:\n", j);
+ HDfprintf(debug_file, "| - Chunk Address: %a\n", local_info_array[j].old_chunk.offset);
+ HDfprintf(debug_file, "| - Chunk Length: %zd\n", local_info_array[j].old_chunk.length);
+ HDfprintf(debug_file, "| - Address of mspace: %x\n", local_info_array[j].chunk_info.mspace);
+ HDfprintf(debug_file, "| - Chunk Selection Type: %d\n", H5S_GET_SELECT_TYPE(local_info_array[j].chunk_info.mspace));
+ HDfprintf(debug_file, "| - Chunk Num Elmts Sel.: %zd\n", H5S_GET_SELECT_NPOINTS(local_info_array[j].chunk_info.mspace));
+ }
+ HDfprintf(debug_file, "------------------------------\n\n");
+#endif
+ }
+
/* Gather the number of chunks each process is writing to all processes */
if (NULL == (num_chunks_selected_array = (size_t *) H5MM_malloc((size_t) mpi_size * sizeof(*num_chunks_selected_array))))
HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate num chunks selected array")
@@ -3113,8 +3171,8 @@ done:
*/
static herr_t
H5D__mpio_filtered_collective_write_type(H5D_filtered_collective_io_info_t *chunk_list,
- size_t num_entries, MPI_Datatype *new_mem_type, hbool_t *mem_type_created,
- MPI_Datatype *new_file_type, hbool_t *file_type_created)
+ size_t num_entries, MPI_Datatype *new_mem_type, hbool_t *mem_type_derived,
+ MPI_Datatype *new_file_type, hbool_t *file_type_derived)
{
MPI_Aint *write_buf_array = NULL; /* Relative displacements of filtered chunk data buffers */
MPI_Aint *file_offset_array = NULL; /* Chunk offsets in the file */
@@ -3125,9 +3183,9 @@ H5D__mpio_filtered_collective_write_type(H5D_filtered_collective_io_info_t *chun
HDassert(chunk_list);
HDassert(new_mem_type);
- HDassert(mem_type_created);
+ HDassert(mem_type_derived);
HDassert(new_file_type);
- HDassert(file_type_created);
+ HDassert(file_type_derived);
if (num_entries > 0) {
size_t i;
@@ -3178,14 +3236,14 @@ H5D__mpio_filtered_collective_write_type(H5D_filtered_collective_io_info_t *chun
/* Create memory MPI type */
if (MPI_SUCCESS != (mpi_code = MPI_Type_create_hindexed((int) num_entries, length_array, write_buf_array, MPI_BYTE, new_mem_type)))
HMPI_GOTO_ERROR(FAIL, "MPI_Type_create_hindexed failed", mpi_code)
- *mem_type_created = TRUE;
+ *mem_type_derived = TRUE;
if (MPI_SUCCESS != (mpi_code = MPI_Type_commit(new_mem_type)))
HMPI_GOTO_ERROR(FAIL, "MPI_Type_commit failed", mpi_code)
/* Create file MPI type */
if (MPI_SUCCESS != (mpi_code = MPI_Type_create_hindexed((int) num_entries, length_array, file_offset_array, MPI_BYTE, new_file_type)))
HMPI_GOTO_ERROR(FAIL, "MPI_Type_create_hindexed failed", mpi_code)
- *file_type_created = TRUE;
+ *file_type_derived = TRUE;
if (MPI_SUCCESS != (mpi_code = MPI_Type_commit(new_file_type)))
HMPI_GOTO_ERROR(FAIL, "MPI_Type_commit failed", mpi_code)
} /* end if */