summaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/H5Dmpio.c47
-rw-r--r--src/H5err.txt1
2 files changed, 23 insertions, 25 deletions
diff --git a/src/H5Dmpio.c b/src/H5Dmpio.c
index 60865c2..63e0417 100644
--- a/src/H5Dmpio.c
+++ b/src/H5Dmpio.c
@@ -1393,7 +1393,7 @@ H5D__link_chunk_filtered_collective_io(H5D_io_info_t *io_info, const H5D_type_in
/* XXX: Not sure about minor error code */
for (i = 0; i < chunk_list_num_entries; i++)
if (H5D__filtered_collective_chunk_entry_io(&chunk_list[i], io_info, type_info) < 0)
- HGOTO_ERROR(H5E_DATASET, H5E_CANTRECV, FAIL, "couldn't process chunk entry")
+ HGOTO_ERROR(H5E_DATASET, H5E_WRITEERROR, FAIL, "couldn't process chunk entry")
#ifdef PARALLEL_COMPRESS_DEBUG
HDfprintf(debug_file, "---------------------------------------------------\n\n");
@@ -1402,10 +1402,9 @@ H5D__link_chunk_filtered_collective_io(H5D_io_info_t *io_info, const H5D_type_in
/* Gather the new chunk sizes to all processes for a collective reallocation
* of the chunks in the file.
*/
- /* XXX: change minor error code */
if (H5D__mpio_array_gather(io_info, chunk_list, chunk_list_num_entries, sizeof(*chunk_list),
(void **) &collective_chunk_list, &collective_chunk_list_num_entries, NULL) < 0)
- HGOTO_ERROR(H5E_DATASET, H5E_CANTRECV, FAIL, "couldn't gather new chunk sizes")
+ HGOTO_ERROR(H5E_DATASET, H5E_CANTGATHER, FAIL, "couldn't gather new chunk sizes")
#ifdef PARALLEL_COMPRESS_DEBUG
HDfprintf(debug_file, "Reallocing chunks:\n");
@@ -1866,10 +1865,10 @@ H5D__multi_chunk_filtered_collective_io(H5D_io_info_t *io_info, const H5D_type_i
io_info->store = &store;
if (io_info->op_type == H5D_IO_OP_READ) { /* Filtered collective read */
- /* XXX: Change minor error code */
+ /* XXX: Test with MPI types and collective read to improve performance */
for (i = 0; i < chunk_list_num_entries; i++)
if (H5D__filtered_collective_chunk_entry_io(&chunk_list[i], io_info, type_info) < 0)
- HGOTO_ERROR(H5E_DATASET, H5E_CANTRECV, FAIL, "couldn't process chunk entry")
+ HGOTO_ERROR(H5E_DATASET, H5E_READERROR, FAIL, "couldn't process chunk entry")
} /* end if */
else { /* Filtered collective write */
H5D_chk_idx_info_t index_info;
@@ -1930,19 +1929,17 @@ H5D__multi_chunk_filtered_collective_io(H5D_io_info_t *io_info, const H5D_type_i
/* Check if this process has a chunk to work on for this iteration */
hbool_t have_chunk_to_process = i < chunk_list_num_entries;
- /* XXX: Not sure about minor error code */
if (have_chunk_to_process)
if (H5D__filtered_collective_chunk_entry_io(&chunk_list[i], io_info, type_info) < 0)
- HGOTO_ERROR(H5E_DATASET, H5E_CANTRECV, FAIL, "couldn't process chunk entry")
+ HGOTO_ERROR(H5E_DATASET, H5E_WRITEERROR, FAIL, "couldn't process chunk entry")
/* Gather the new chunk sizes to all processes for a collective re-allocation
* of the chunks in the file
*/
/* XXX: May access unavailable memory on processes with no selection */
- /* XXX: change minor error code */
if (H5D__mpio_array_gather(io_info, &chunk_list[i], have_chunk_to_process ? 1 : 0, sizeof(*chunk_list),
(void **) &collective_chunk_list, &collective_chunk_list_num_entries, NULL) < 0)
- HGOTO_ERROR(H5E_DATASET, H5E_CANTRECV, FAIL, "couldn't gather new chunk sizes")
+ HGOTO_ERROR(H5E_DATASET, H5E_CANTGATHER, FAIL, "couldn't gather new chunk sizes")
#ifdef PARALLEL_COMPRESS_DEBUG
HDfprintf(debug_file, "Reallocing chunks:\n");
@@ -2736,7 +2733,7 @@ H5D__construct_filtered_io_info_list(const H5D_io_info_t *io_info, const H5D_typ
if ((chunk_npoints = H5S_GET_EXTENT_NPOINTS(chunk_info->fspace)) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_CANTCOUNT, FAIL, "dataspace is invalid")
local_info_array[i].full_overwrite =
- (local_info_array[i].io_size == (hsize_t) chunk_npoints * type_info->dst_type_size) ? TRUE : FALSE;
+ (local_info_array[i].io_size >= (hsize_t) chunk_npoints * type_info->dst_type_size) ? TRUE : FALSE;
chunk_node = H5SL_next(chunk_node);
} /* end for */
@@ -2752,7 +2749,7 @@ H5D__construct_filtered_io_info_list(const H5D_io_info_t *io_info, const H5D_typ
HDfprintf(debug_file, "| - Address of mspace: %x\n", local_info_array[j].chunk_info.mspace);
HDfprintf(debug_file, "| - Chunk Selection Type: %d\n", H5S_GET_SELECT_TYPE(local_info_array[j].chunk_info.mspace));
HDfprintf(debug_file, "| - Chunk Num Elmts Sel.: %zd\n", H5S_GET_SELECT_NPOINTS(local_info_array[j].chunk_info.mspace));
- HDfprintf(debug_file, "| - Chunk write status: %s\n", (local_info_array[i].full_overwrite) ? "overwrite" : "update");
+ HDfprintf(debug_file, "| - Chunk write status: %s\n", (local_info_array[j].full_overwrite) ? "overwrite" : "update");
}
HDfprintf(debug_file, "------------------------------\n\n");
@@ -2785,11 +2782,10 @@ H5D__construct_filtered_io_info_list(const H5D_io_info_t *io_info, const H5D_typ
if (NULL == (mem_iter = (H5S_sel_iter_t *) H5MM_malloc(sizeof(H5S_sel_iter_t))))
HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate memory iterator")
- /* XXX: Change minor error code */
if (H5D__mpio_array_gather(io_info, local_info_array, num_chunks_selected,
sizeof(*local_info_array), (void **) &overlap_info_array, &overlap_info_array_num_entries,
H5D__cmp_filtered_collective_io_info_entry) < 0)
- HGOTO_ERROR(H5E_DATASET, H5E_CANTRECV, FAIL, "couldn't gather array")
+ HGOTO_ERROR(H5E_DATASET, H5E_CANTGATHER, FAIL, "couldn't gather array")
for (i = 0, num_chunks_selected = 0, num_send_requests = 0; i < overlap_info_array_num_entries;) {
H5D_filtered_collective_io_info_t chunk_entry;
@@ -2807,7 +2803,6 @@ H5D__construct_filtered_io_info_list(const H5D_io_info_t *io_info, const H5D_typ
* contributing to the chunk and so will not try to access an invalid
* dataspace when processes are sending chunk data to new owners */
chunk_entry.chunk_info.fspace = NULL;
- chunk_entry.io_size = 0;
/* Process duplicate entries caused by another process writing
* to the same chunk
@@ -2821,6 +2816,9 @@ H5D__construct_filtered_io_info_list(const H5D_io_info_t *io_info, const H5D_typ
if (mpi_rank == overlap_info_array[i].owner)
chunk_entry = overlap_info_array[i];
+ /* Add this chunk entry's IO size to the running total */
+ total_io_size += overlap_info_array[i].io_size;
+
/* New owner of the chunk is determined by the process
* which is writing the most data to the chunk
*/
@@ -2832,11 +2830,6 @@ H5D__construct_filtered_io_info_list(const H5D_io_info_t *io_info, const H5D_typ
num_writers++;
} while (++i < overlap_info_array_num_entries && overlap_info_array[i].old_chunk.offset == chunk_addr);
- /* Determine the total IO size to the chunk by all processes combined */
- if (MPI_SUCCESS != (mpi_code = MPI_Reduce(&chunk_entry.io_size, &total_io_size,
- 1, MPI_UNSIGNED_LONG_LONG, MPI_SUM, new_owner, io_info->comm)))
- HMPI_GOTO_ERROR(FAIL, "MPI_reduce failed", mpi_code)
-
if (mpi_rank == new_owner) {
hssize_t chunk_npoints;
@@ -2845,14 +2838,17 @@ H5D__construct_filtered_io_info_list(const H5D_io_info_t *io_info, const H5D_typ
*/
chunk_entry.num_writers = num_writers;
- /* Set the full chunk overwrite status. For simplicity, assume that this is
- * a full overwrite of the chunk if the total IO size is equal to the size
- * of the chunk and regard overlapping writes as an error.
+ /* Set the full chunk overwrite status. It is assumed that this is a full
+ * overwrite of the chunk if the total IO size is equal to the size of the
+ * chunk. If the IO size is greater than the size of the chunk, there is an
+ * overlapping write between processes, meaning there is no guarantee on
+ * the integrity of data in the write operation. However, this still
+ * represents a full overwrite of the chunk.
*/
if ((chunk_npoints = H5S_GET_EXTENT_NPOINTS(chunk_entry.chunk_info.fspace)) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_CANTCOUNT, FAIL, "dataspace is invalid")
- chunk_entry.full_overwrite = (total_io_size == (hsize_t) chunk_npoints * type_info->dst_type_size) ? TRUE : FALSE;
+ chunk_entry.full_overwrite = (total_io_size >= (hsize_t) chunk_npoints * type_info->dst_type_size) ? TRUE : FALSE;
/* New owner takes possession of the chunk */
overlap_info_array[num_chunks_selected++] = chunk_entry;
@@ -3159,12 +3155,12 @@ H5D__filtered_collective_chunk_entry_io(H5D_filtered_collective_io_info_t *chunk
if (!chunk_entry->full_overwrite || io_info->op_type == H5D_IO_OP_READ) {
if (H5F_block_read(io_info->dset->oloc.file, H5FD_MEM_DRAW, chunk_entry->old_chunk.offset,
buf_size, H5AC_rawdata_dxpl_id, chunk_entry->buf) < 0)
- HGOTO_ERROR(H5E_IO, H5E_READERROR, H5_ITER_ERROR, "unable to read raw data chunk")
+ HGOTO_ERROR(H5E_IO, H5E_READERROR, FAIL, "unable to read raw data chunk")
if (H5Z_pipeline(&io_info->dset->shared->dcpl_cache.pline, H5Z_FLAG_REVERSE, &filter_mask,
io_info->dxpl_cache->err_detect, io_info->dxpl_cache->filter_cb,
(size_t *) &chunk_entry->new_chunk.length, &buf_size, &chunk_entry->buf) < 0)
- HGOTO_ERROR(H5E_PLINE, H5E_CANTFILTER, H5_ITER_ERROR, "couldn't unfilter chunk for modifying")
+ HGOTO_ERROR(H5E_PLINE, H5E_CANTFILTER, FAIL, "couldn't unfilter chunk for modifying")
#ifdef PARALLEL_COMPRESS_DEBUG
HDfprintf(debug_file, "| - Read chunk from file.\n");
@@ -3214,6 +3210,7 @@ H5D__filtered_collective_chunk_entry_io(H5D_filtered_collective_io_info_t *chunk
if (mem_iter_init && H5S_SELECT_ITER_RELEASE(mem_iter) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_CANTFREE, FAIL, "couldn't release selection iterator")
+ /* XXX: Implement re-alloc strategy too avoid too many malloc/frees */
/* Update the chunk data with any modifications from other processes */
while (chunk_entry->num_writers > 1) {
const unsigned char *mod_data_p; /* Use second pointer since H5S_decode advances pointer */
diff --git a/src/H5err.txt b/src/H5err.txt
index 44c5a93..9aa2e6b 100644
--- a/src/H5err.txt
+++ b/src/H5err.txt
@@ -244,6 +244,7 @@ MINOR, LINK, H5E_CANTSORT, Can't sort objects
MINOR, MPI, H5E_MPI, Some MPI function failed
MINOR, MPI, H5E_MPIERRSTR, MPI Error String
MINOR, MPI, H5E_CANTRECV, Can't receive data
+MINOR, MPI, H5E_CANTGATHER, Can't gather data
# Heap errors
MINOR, HEAP, H5E_CANTRESTORE, Can't restore condition