summaryrefslogtreecommitdiffstats
path: root/src/H5Dmpio.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/H5Dmpio.c')
-rw-r--r--src/H5Dmpio.c348
1 files changed, 6 insertions, 342 deletions
diff --git a/src/H5Dmpio.c b/src/H5Dmpio.c
index 100aa97..af5d44b 100644
--- a/src/H5Dmpio.c
+++ b/src/H5Dmpio.c
@@ -86,12 +86,6 @@
#define H5D_CHUNK_SELECT_IRREG 2
#define H5D_CHUNK_SELECT_NONE 0
-#define PARALLEL_COMPRESS_DEBUG
-
-#ifdef PARALLEL_COMPRESS_DEBUG
-FILE *debug_file;
-#endif
-
/******************/
/* Local Typedefs */
/******************/
@@ -355,10 +349,10 @@ H5D__mpio_array_gather(const H5D_io_info_t *io_info, void *local_array,
void **_gathered_array, size_t *_gathered_array_num_entries,
int (*sort_func)(const void *, const void *))
{
- size_t gathered_array_num_entries = 0;
+ size_t gathered_array_num_entries = 0; /* The size of the newly-constructed array */
size_t i;
- void *gathered_array = NULL;
- int *receive_counts_array = NULL; /* Array containing number of entries each process contributes */
+ void *gathered_array = NULL; /* The newly-constructed array returned to the caller */
+ int *receive_counts_array = NULL; /* Array containing number of entries each process is contributing */
int *displacements_array = NULL; /* Array of displacements where each process places its data in the final array */
int mpi_code, mpi_size;
int sendcount;
@@ -412,18 +406,6 @@ H5D__mpio_array_gather(const H5D_io_info_t *io_info, void *local_array,
*_gathered_array = gathered_array;
*_gathered_array_num_entries = gathered_array_num_entries;
-#ifdef PARALLEL_COMPRESS_DEBUG
- HDfprintf(debug_file, " Contents of gathered array:\n");
- HDfprintf(debug_file, "------------------------------\n");
- for (size_t j = 0; j < (size_t) gathered_array_num_entries; j++) {
- HDfprintf(debug_file, "| Chunk Entry %zd:\n", j);
- HDfprintf(debug_file, "| - Chunk Address: %a\n", ((H5D_filtered_collective_io_info_t *) gathered_array)[j].new_chunk.offset);
- HDfprintf(debug_file, "| - Chunk Length: %zd\n", ((H5D_filtered_collective_io_info_t *) gathered_array)[j].new_chunk.length);
- HDfprintf(debug_file, "| - Address of mspace: %x\n", ((H5D_filtered_collective_io_info_t *) gathered_array)[j].chunk_info.mspace);
- }
- HDfprintf(debug_file, "------------------------------\n\n");
-#endif
-
done:
if (receive_counts_array)
H5MM_free(receive_counts_array);
@@ -916,31 +898,11 @@ H5D__chunk_collective_write(H5D_io_info_t *io_info, const H5D_type_info_t *type_
FUNC_ENTER_PACKAGE
-#ifdef PARALLEL_COMPRESS_DEBUG
- char name[10];
-
- snprintf(name, 10, "out - %d", H5F_mpi_get_rank(io_info->dset->oloc.file));
-
- debug_file = fopen(name, "a");
-
- HDfprintf(debug_file, "**************************\n");
- HDfprintf(debug_file, "* Starting write\n");
- HDfprintf(debug_file, "**************************\n\n");
-#endif
-
/* Call generic selection operation */
if(H5D__chunk_collective_io(io_info, type_info, fm) < 0)
HGOTO_ERROR(H5E_DATASPACE, H5E_WRITEERROR, FAIL, "write error")
done:
-#ifdef PARALLEL_COMPRESS_DEBUG
- HDfprintf(debug_file, "**************************\n");
- HDfprintf(debug_file, "* Finished write\n");
- HDfprintf(debug_file, "**************************\n\n");
-
- fclose(debug_file);
-#endif
-
FUNC_LEAVE_NOAPI(ret_value)
} /* end H5D__chunk_collective_write() */
@@ -1367,16 +1329,6 @@ H5D__link_chunk_filtered_collective_io(H5D_io_info_t *io_info, const H5D_type_in
if (H5D__construct_filtered_io_info_list(io_info, type_info, fm, &chunk_list, &chunk_list_num_entries) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_CANTINIT, FAIL, "couldn't construct filtered I/O info list")
-#ifdef PARALLEL_COMPRESS_DEBUG
- HDfprintf(debug_file, "Incoming messages from other processes:\n");
- HDfprintf(debug_file, "-----------------------------------------\n");
- for (size_t j = 0; j < chunk_list_num_entries; j++) {
- HDfprintf(debug_file, "| Owner of chunk at address %a is expecting messages from %d other processes.\n",
- chunk_list[j].old_chunk.offset, chunk_list[j].num_writers - 1);
- }
- HDfprintf(debug_file, "-----------------------------------------\n\n");
-#endif
-
if (io_info->op_type == H5D_IO_OP_WRITE) { /* Filtered collective write */
H5D_chk_idx_info_t index_info;
H5D_chunk_ud_t udata;
@@ -1394,12 +1346,6 @@ H5D__link_chunk_filtered_collective_io(H5D_io_info_t *io_info, const H5D_type_in
udata.common.storage = index_info.storage;
udata.filter_mask = 0;
-
-#ifdef PARALLEL_COMPRESS_DEBUG
- HDfprintf(debug_file, "Processing chunks:\n");
- HDfprintf(debug_file, "---------------------------------------------------\n");
-#endif
-
/* Iterate through all the chunks in the collective write operation,
* updating each chunk with the data modifications from other processes,
* then re-filtering the chunk.
@@ -1408,10 +1354,6 @@ H5D__link_chunk_filtered_collective_io(H5D_io_info_t *io_info, const H5D_type_in
if (H5D__filtered_collective_chunk_entry_io(&chunk_list[i], io_info, type_info) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_WRITEERROR, FAIL, "couldn't process chunk entry")
-#ifdef PARALLEL_COMPRESS_DEBUG
- HDfprintf(debug_file, "---------------------------------------------------\n\n");
-#endif
-
/* Gather the new chunk sizes to all processes for a collective reallocation
* of the chunks in the file.
*/
@@ -1419,32 +1361,15 @@ H5D__link_chunk_filtered_collective_io(H5D_io_info_t *io_info, const H5D_type_in
(void **) &collective_chunk_list, &collective_chunk_list_num_entries, NULL) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_CANTGATHER, FAIL, "couldn't gather new chunk sizes")
-#ifdef PARALLEL_COMPRESS_DEBUG
- HDfprintf(debug_file, "Reallocing chunks:\n");
- HDfprintf(debug_file, "------------------------------\n");
-#endif
-
/* Collectively re-allocate the modified chunks (from each process) in the file */
for (i = 0; i < collective_chunk_list_num_entries; i++) {
hbool_t insert;
-#ifdef PARALLEL_COMPRESS_DEBUG
- HDfprintf(debug_file, "| Reallocing chunk at address %a with new length of %zd.\n", collective_chunk_list[i].new_chunk.offset, collective_chunk_list[i].new_chunk.length);
-#endif
-
if (H5D__chunk_file_alloc(&index_info, &collective_chunk_list[i].old_chunk, &collective_chunk_list[i].new_chunk,
&insert, collective_chunk_list[i].chunk_info.scaled) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "unable to allocate chunk")
-
-#ifdef PARALLEL_COMPRESS_DEBUG
- HDfprintf(debug_file, "| - Chunk now at address %a.\n|\n", collective_chunk_list[i].new_chunk);
-#endif
} /* end for */
-#ifdef PARALLEL_COMPRESS_DEBUG
- HDfprintf(debug_file, "------------------------------\n\n");
-#endif
-
if (NULL == (num_chunks_selected_array = (size_t *) H5MM_malloc((size_t) mpi_size * sizeof(*num_chunks_selected_array))))
HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate num chunks selected array")
@@ -1452,15 +1377,6 @@ H5D__link_chunk_filtered_collective_io(H5D_io_info_t *io_info, const H5D_type_in
1, MPI_UNSIGNED_LONG_LONG, io_info->comm)))
HMPI_GOTO_ERROR(FAIL, "MPI_Allgather failed", mpi_code)
-#ifdef PARALLEL_COMPRESS_DEBUG
- HDfprintf(debug_file, " Num Chunks Selected Array\n");
- HDfprintf(debug_file, "------------------------------------\n");
- for (size_t j = 0; j < (size_t) mpi_size; j++) {
- HDfprintf(debug_file, "| Process %d has %zd chunks selected.\n", j, num_chunks_selected_array[j]);
- }
- HDfprintf(debug_file, "------------------------------------\n\n");
-#endif
-
/* If this process has any chunks selected, create a MPI type for collectively
* writing out the chunks to file. Otherwise, the process contributes to the
* collective write with a none type.
@@ -1508,31 +1424,14 @@ H5D__link_chunk_filtered_collective_io(H5D_io_info_t *io_info, const H5D_type_in
/* Participate in the collective re-insertion of all chunks modified
* in this iteration into the chunk index
*/
-#ifdef PARALLEL_COMPRESS_DEBUG
- HDfprintf(debug_file, "Reinserting chunks into chunk index.\n");
- HDfprintf(debug_file, "---------------------------------------\n");
-#endif
-
for (i = 0; i < collective_chunk_list_num_entries; i++) {
udata.chunk_block = collective_chunk_list[i].new_chunk;
udata.common.scaled = collective_chunk_list[i].chunk_info.scaled;
udata.chunk_idx = collective_chunk_list[i].chunk_info.index;
-#ifdef PARALLEL_COMPRESS_DEBUG
- HDfprintf(debug_file, "| Reinserting chunk at index %llu.\n", udata.chunk_idx);
-#endif
-
if ((index_info.storage->ops->insert)(&index_info, &udata, io_info->dset) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_CANTINSERT, FAIL, "unable to insert chunk address into index")
-
-#ifdef PARALLEL_COMPRESS_DEBUG
- HDfprintf(debug_file, "| Successfully inserted chunk at address %a into the chunk index at index %llu.\n", udata.chunk_block.offset, udata.chunk_idx);
-#endif
} /* end for */
-
-#ifdef PARALLEL_COMPRESS_DEBUG
- HDfprintf(debug_file, "---------------------------------------\n");
-#endif
} /* end if */
done:
@@ -1882,7 +1781,6 @@ H5D__multi_chunk_filtered_collective_io(H5D_io_info_t *io_info, const H5D_type_i
io_info->store = &store;
if (io_info->op_type == H5D_IO_OP_READ) { /* Filtered collective read */
- /* XXX: Test with MPI types and collective read to improve performance */
for (i = 0; i < chunk_list_num_entries; i++)
if (H5D__filtered_collective_chunk_entry_io(&chunk_list[i], io_info, type_info) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_READERROR, FAIL, "couldn't process chunk entry")
@@ -1905,19 +1803,6 @@ H5D__multi_chunk_filtered_collective_io(H5D_io_info_t *io_info, const H5D_type_i
udata.common.storage = index_info.storage;
udata.filter_mask = 0;
-#ifdef PARALLEL_COMPRESS_DEBUG
- HDfprintf(debug_file, "Incoming messages from other processes:\n");
- HDfprintf(debug_file, "-----------------------------------------\n");
- for (size_t k = 0; k < chunk_list_num_entries; k++) {
- HDfprintf(debug_file, "| Owner of chunk at address %a is expecting messages from %d other processes.\n",
- chunk_list[k].old_chunk.offset, chunk_list[k].num_writers - 1);
- }
- HDfprintf(debug_file, "-----------------------------------------\n\n");
-
- HDfprintf(debug_file, "Processing chunks:\n");
- HDfprintf(debug_file, "---------------------------------------------------\n");
-#endif
-
/* Retrieve the maximum number of chunks being written among all processes */
if (MPI_SUCCESS != (mpi_code = MPI_Allreduce(&chunk_list_num_entries, &max_num_chunks,
1, MPI_UNSIGNED_LONG_LONG, MPI_MAX, io_info->comm)))
@@ -1960,35 +1845,17 @@ H5D__multi_chunk_filtered_collective_io(H5D_io_info_t *io_info, const H5D_type_i
(void **) &collective_chunk_list, &collective_chunk_list_num_entries, NULL) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_CANTGATHER, FAIL, "couldn't gather new chunk sizes")
-#ifdef PARALLEL_COMPRESS_DEBUG
- HDfprintf(debug_file, "Reallocing chunks:\n");
- HDfprintf(debug_file, "------------------------------\n");
-#endif
-
/* Participate in the collective re-allocation of all chunks modified
* in this iteration.
*/
for (j = 0; j < collective_chunk_list_num_entries; j++) {
hbool_t insert = FALSE;
-#ifdef PARALLEL_COMPRESS_DEBUG
- HDfprintf(debug_file, "| Re-allocing chunk at address %a with new length of %llu bytes.\n",
- collective_chunk_list[j].new_chunk.offset, collective_chunk_list[j].new_chunk.length);
-#endif
-
if (H5D__chunk_file_alloc(&index_info, &collective_chunk_list[j].old_chunk, &collective_chunk_list[j].new_chunk,
&insert, chunk_list[j].chunk_info.scaled) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "unable to allocate chunk")
-
-#ifdef PARALLEL_COMPRESS_DEBUG
- HDfprintf(debug_file, "| Chunk now at address %a.\n|\n", collective_chunk_list[j].new_chunk);
-#endif
} /* end for */
-#ifdef PARALLEL_COMPRESS_DEBUG
- HDfprintf(debug_file, "------------------------------\n\n");
-#endif
-
if (NULL == (has_chunk_selected_array = (hbool_t *) H5MM_malloc((size_t) mpi_size * sizeof(*has_chunk_selected_array))))
HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate num chunks selected array")
@@ -2010,11 +1877,6 @@ H5D__multi_chunk_filtered_collective_io(H5D_io_info_t *io_info, const H5D_type_i
* collective array gets updated by the chunk re-allocation */
HDmemcpy(&chunk_list[i].new_chunk, &collective_chunk_list[offset].new_chunk, sizeof(chunk_list[i].new_chunk));
-#ifdef PARALLEL_COMPRESS_DEBUG
- HDfprintf(debug_file, "New chunk record after memcpy back to local:\n");
- HDfprintf(debug_file, " - Chunk offset: %a, Chunk length: %lld\n", chunk_list[i].new_chunk.offset, chunk_list[i].new_chunk.length);
-#endif
-
H5_CHECKED_ASSIGN(mpi_type_count, int, chunk_list[i].new_chunk.length, hsize_t);
/* Create MPI memory type for writing to chunk */
@@ -2053,32 +1915,15 @@ H5D__multi_chunk_filtered_collective_io(H5D_io_info_t *io_info, const H5D_type_i
/* Participate in the collective re-insertion of all chunks modified
* in this iteration into the chunk index
*/
-#ifdef PARALLEL_COMPRESS_DEBUG
- HDfprintf(debug_file, "Reinserting chunks into chunk index.\n");
- HDfprintf(debug_file, "---------------------------------------\n");
-#endif
-
for (j = 0; j < collective_chunk_list_num_entries; j++) {
udata.chunk_block = collective_chunk_list[j].new_chunk;
udata.common.scaled = collective_chunk_list[j].chunk_info.scaled;
udata.chunk_idx = collective_chunk_list[j].chunk_info.index;
-#ifdef PARALLEL_COMPRESS_DEBUG
- HDfprintf(debug_file, "| Reinserting chunk at index %llu.\n", udata.chunk_idx);
-#endif
-
if ((index_info.storage->ops->insert)(&index_info, &udata, io_info->dset) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_CANTINSERT, FAIL, "unable to insert chunk address into index")
-
-#ifdef PARALLEL_COMPRESS_DEBUG
- HDfprintf(debug_file, "| Successfully inserted chunk at address %a into the chunk index at index %llu.\n", udata.chunk_block.offset, udata.chunk_idx);
-#endif
} /* end for */
-#ifdef PARALLEL_COMPRESS_DEBUG
- HDfprintf(debug_file, "---------------------------------------\n");
-#endif
-
if (collective_chunk_list){
H5MM_free(collective_chunk_list);
collective_chunk_list = NULL;
@@ -2714,8 +2559,6 @@ H5D__construct_filtered_io_info_list(const H5D_io_info_t *io_info, const H5D_typ
{
H5D_filtered_collective_io_info_t *local_info_array = NULL; /* The list of initially select chunks for this process */
H5D_filtered_collective_io_info_t *overlap_info_array = NULL; /* The list of all chunks selected in the operation by all processes */
- /* H5D_mpio_filtered_write_mode_t filtered_write_mode = H5D_MPIO_UNSAFE_FILTERED_WRITE; */
- /* H5P_genplist_t *dx_plist; */
H5S_sel_iter_t *mem_iter = NULL; /* Memory iterator for H5D__gather_mem */
unsigned char *mod_data = NULL; /* Chunk modification data sent by a process to a chunk's owner */
H5SL_node_t *chunk_node;
@@ -2779,50 +2622,8 @@ H5D__construct_filtered_io_info_list(const H5D_io_info_t *io_info, const H5D_typ
} /* end for */
} /* end if */
-#ifdef PARALLEL_COMPRESS_DEBUG
- HDfprintf(debug_file, " Contents of local info array\n");
- HDfprintf(debug_file, "------------------------------\n");
- for (size_t j = 0; j < (size_t) num_chunks_selected; j++) {
- HDfprintf(debug_file, "| Chunk Entry %zd:\n", j);
- HDfprintf(debug_file, "| - Chunk Address: %a\n", local_info_array[j].old_chunk.offset);
- HDfprintf(debug_file, "| - Chunk Length: %zd\n", local_info_array[j].old_chunk.length);
- HDfprintf(debug_file, "| - Address of mspace: %x\n", local_info_array[j].chunk_info.mspace);
- HDfprintf(debug_file, "| - Chunk Selection Type: %d\n", H5S_GET_SELECT_TYPE(local_info_array[j].chunk_info.mspace));
- HDfprintf(debug_file, "| - Chunk Num Elmts Sel.: %zd\n", H5S_GET_SELECT_NPOINTS(local_info_array[j].chunk_info.mspace));
- HDfprintf(debug_file, "| - Chunk write status: %s\n", (local_info_array[j].full_overwrite) ? "overwrite" : "update");
- }
- HDfprintf(debug_file, "------------------------------\n\n");
-
- HDfprintf(debug_file, "Testing mem/file space addresses:\n");
- HDfprintf(debug_file, "-----------------------------------\n");
-
- for (size_t j = 0; j < num_chunks_selected; j++) {
- HDfprintf(debug_file, "| Testing chunk at address %a.\n", local_info_array[j].old_chunk.offset);
- HDfprintf(debug_file, "| Mem Space:\n");
- HDfprintf(debug_file, "| - Extent Num elements: %zd\n", H5S_GET_EXTENT_NPOINTS(local_info_array[j].chunk_info.mspace));
- HDfprintf(debug_file, "| - Extent type: %d\n", H5S_GET_EXTENT_TYPE(local_info_array[j].chunk_info.mspace));
- HDfprintf(debug_file, "| - Selection Num elements: %zd\n", H5S_GET_SELECT_NPOINTS(local_info_array[j].chunk_info.mspace));
- HDfprintf(debug_file, "| - Selection type: %d\n", H5S_GET_SELECT_TYPE(local_info_array[j].chunk_info.mspace));
- HDfprintf(debug_file, "| File Space:\n");
- HDfprintf(debug_file, "| - Extent Num elements: %zd\n", H5S_GET_EXTENT_NPOINTS(local_info_array[j].chunk_info.fspace));
- HDfprintf(debug_file, "| - Extent type: %d\n", H5S_GET_EXTENT_TYPE(local_info_array[j].chunk_info.fspace));
- HDfprintf(debug_file, "| - Selection Num elements: %zd\n", H5S_GET_SELECT_NPOINTS(local_info_array[j].chunk_info.fspace));
- HDfprintf(debug_file, "| - Selection type: %d\n|\n", H5S_GET_SELECT_TYPE(local_info_array[j].chunk_info.fspace));
- }
-
- HDfprintf(debug_file, "-----------------------------------\n\n");
-#endif
-
- /* XXX: Add SAFE_FILTERED_CHUNK_WRITE to property lists */
- /* Get the no overlap property */
- /* if (NULL == (dx_plist = (H5P_genplist_t *) H5I_object(io_info->raw_dxpl_id)))
- HGOTO_ERROR(H5E_ARGS, H5E_BADTYPE, FAIL, "not a data transfer property list")
-
- if (H5P_get(dx_plist, H5D_MPIO_FILTERED_WRITE_MODE, &filtered_write_mode))
- HGOTO_ERROR(H5E_PLIST, H5E_CANTGET, FAIL, "couldn't get filtered chunk write mode property") */
-
/* Redistribute shared chunks to new owners as necessary */
- if (io_info->op_type == H5D_IO_OP_WRITE /* && (filtered_write_mode != H5D_MPIO_SAFE_FILTERED_WRITE) */) {
+ if (io_info->op_type == H5D_IO_OP_WRITE) {
if (num_chunks_selected)
if (NULL == (send_requests = (MPI_Request *) H5MM_malloc(num_chunks_selected * sizeof(*send_requests))))
HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate send requests buffer")
@@ -2920,10 +2721,6 @@ H5D__construct_filtered_io_info_list(const H5D_io_info_t *io_info, const H5D_typ
mod_data_size += (size_t) iter_nelmts * type_info->src_type_size;
-#ifdef PARALLEL_COMPRESS_DEBUG
- HDfprintf(debug_file, "| Allocing %zd bytes for mod. data buffer.\n", (size_t) mod_data_size);
-#endif
-
if (NULL == (mod_data = (unsigned char *) H5MM_malloc(mod_data_size)))
HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate chunk modification send buffer")
@@ -2937,27 +2734,11 @@ H5D__construct_filtered_io_info_list(const H5D_io_info_t *io_info, const H5D_typ
HGOTO_ERROR(H5E_DATASET, H5E_CANTINIT, FAIL, "unable to initialize memory selection information")
mem_iter_init = TRUE;
-#ifdef PARALLEL_COMPRESS_DEBUG
- HDfprintf(debug_file, "| Iterating over %lld elements.\n", iter_nelmts);
-#endif
-
/* Collect the modification data into the buffer */
if (!H5D__gather_mem(io_info->u.wbuf, chunk_entry.chunk_info.mspace, mem_iter,
(size_t) iter_nelmts, io_info->dxpl_cache, mod_data_p))
HGOTO_ERROR(H5E_IO, H5E_WRITEERROR, FAIL, "couldn't gather from write buffer")
-#ifdef PARALLEL_COMPRESS_DEBUG
- HDfprintf(debug_file, "| - Mod. Data Buffer:\n");
- HDfprintf(debug_file, "| - [");
- for (size_t j = 0; j < (size_t) iter_nelmts; j++) {
- if (j > 0) HDfprintf(debug_file, ", ");
- HDfprintf(debug_file, "%lld", ((long *) mod_data_p)[j]);
- }
- HDfprintf(debug_file, "]\n|\n");
-
- HDfprintf(debug_file, "| Sending modification data for chunk at address %a to process %d.\n", chunk_entry.old_chunk.offset, new_owner);
-#endif
-
/* Send modification data to new owner */
H5_CHECK_OVERFLOW(mod_data_size, size_t, int)
H5_CHECK_OVERFLOW(chunk_entry.chunk_info.index, hsize_t, int)
@@ -2965,9 +2746,6 @@ H5D__construct_filtered_io_info_list(const H5D_io_info_t *io_info, const H5D_typ
(int) chunk_entry.chunk_info.index, io_info->comm, &send_requests[num_send_requests++])))
HMPI_GOTO_ERROR(FAIL, "MPI_Isend failed", mpi_code)
-#ifdef PARALLEL_COMPRESS_DEBUG
- HDfprintf(debug_file, "| Mod. data sent.\n|\n");
-#endif
if (mod_data) {
H5MM_free(mod_data);
mod_data = NULL;
@@ -2976,10 +2754,6 @@ H5D__construct_filtered_io_info_list(const H5D_io_info_t *io_info, const H5D_typ
HGOTO_ERROR(H5E_DATASET, H5E_CANTFREE, FAIL, "couldn't release selection iterator")
mem_iter_init = FALSE;
} /* end else */
-
-#ifdef PARALLEL_COMPRESS_DEBUG
- HDfprintf(debug_file, "| Chunk at address %a re-assigned to process %d.\n|\n", chunk_addr, new_owner);
-#endif
} /* end for */
/* Release old list */
@@ -2988,29 +2762,13 @@ H5D__construct_filtered_io_info_list(const H5D_io_info_t *io_info, const H5D_typ
/* Local info list becomes modified (redistributed) chunk list */
local_info_array = overlap_info_array;
-
-#ifdef PARALLEL_COMPRESS_DEBUG
- HDfprintf(debug_file, "This process now has %d chunks selected after redistribution.\n\n", num_chunks_selected);
-
- HDfprintf(debug_file, " Contents of local info array (after redistribution)\n");
- HDfprintf(debug_file, "------------------------------\n");
- for (size_t j = 0; j < (size_t) num_chunks_selected; j++) {
- HDfprintf(debug_file, "| Chunk Entry %zd:\n", j);
- HDfprintf(debug_file, "| - Chunk Address: %a\n", local_info_array[j].old_chunk.offset);
- HDfprintf(debug_file, "| - Chunk Length: %zd\n", local_info_array[j].old_chunk.length);
- HDfprintf(debug_file, "| - Address of mspace: %x\n", local_info_array[j].chunk_info.fspace);
- HDfprintf(debug_file, "| - Chunk Selection Type: %d\n", H5S_GET_SELECT_TYPE(local_info_array[j].chunk_info.fspace));
- HDfprintf(debug_file, "| - Chunk Num Elmts Sel.: %zd\n", H5S_GET_SELECT_NPOINTS(local_info_array[j].chunk_info.fspace));
- }
- HDfprintf(debug_file, "------------------------------\n\n");
-#endif
} /* end if */
*chunk_list = local_info_array;
*num_entries = num_chunks_selected;
/* Wait for all async send requests to complete before returning */
- if (/* (filtered_write_mode != H5D_MPIO_SAFE_FILTERED_WRITE) && */ num_send_requests) {
+ if (num_send_requests) {
if (NULL == (send_statuses = (MPI_Status *) H5MM_malloc(num_send_requests * sizeof(*send_statuses))))
HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate send statuses buffer")
@@ -3086,11 +2844,6 @@ H5D__mpio_filtered_collective_write_type(H5D_filtered_collective_io_info_t *chun
/* Ensure the list is sorted in ascending order of offset in the file */
HDqsort(chunk_list, num_entries, sizeof(*chunk_list), H5D__cmp_filtered_collective_io_info_entry);
-#ifdef PARALLEL_COMPRESS_DEBUG
- HDfprintf(debug_file, "MPI Write type entries:\n");
- HDfprintf(debug_file, "---------------------------------\n");
-#endif
-
base_buf = chunk_list[0].buf;
for (i = 0; i < num_entries; i++) {
/* Set up the offset in the file, the length of the chunk data, and the relative
@@ -3099,22 +2852,8 @@ H5D__mpio_filtered_collective_write_type(H5D_filtered_collective_io_info_t *chun
file_offset_array[i] = (MPI_Aint) chunk_list[i].new_chunk.offset;
length_array[i] = (int) chunk_list[i].new_chunk.length;
write_buf_array[i] = (MPI_Aint) chunk_list[i].buf - (MPI_Aint) base_buf;
-
-#ifdef PARALLEL_COMPRESS_DEBUG
- HDfprintf(debug_file, "| Type Entry %zd:\n", i);
- HDfprintf(debug_file, "| - Offset: %a; Length: %zd\n", file_offset_array[i], length_array[i]);
- HDfprintf(debug_file, "| - Write buffer:\n| [");
- for (size_t j = 0; j < (size_t) length_array[i]; j++) {
- HDfprintf(debug_file, "%c, ", ((char *) chunk_list[i].buf)[j]);
- }
- HDfprintf(debug_file, "]\n|\n");
-#endif
} /* end for */
-#ifdef PARALLEL_COMPRESS_DEBUG
- HDfprintf(debug_file, "---------------------------------\n\n");
-#endif
-
/* Create memory MPI type */
if (MPI_SUCCESS != (mpi_code = MPI_Type_create_hindexed((int) num_entries, length_array, write_buf_array, MPI_BYTE, new_mem_type)))
HMPI_GOTO_ERROR(FAIL, "MPI_Type_create_hindexed failed", mpi_code)
@@ -3179,10 +2918,6 @@ H5D__filtered_collective_chunk_entry_io(H5D_filtered_collective_io_info_t *chunk
HDassert(io_info);
HDassert(type_info);
-#ifdef PARALLEL_COMPRESS_DEBUG
- HDfprintf(debug_file, "| Chunk at address %a:\n", chunk_entry->old_chunk.offset);
-#endif
-
/* If this is a read operation or a write operation where the chunk is not being fully
* overwritten, enough memory must be allocated to read the filtered chunk from the file.
* If this is a write operation where the chunk is being fully overwritten, enough memory
@@ -3202,12 +2937,6 @@ H5D__filtered_collective_chunk_entry_io(H5D_filtered_collective_io_info_t *chunk
chunk_entry->new_chunk.length = buf_size;
-#ifdef PARALLEL_COMPRESS_DEBUG
- HDfprintf(debug_file, "| - Allocing %zd bytes for chunk data buffer.\n", buf_size);
- if (io_info->op_type == H5D_IO_OP_WRITE)
- HDfprintf(debug_file, "| - Write type is: %s.\n", (chunk_entry->full_overwrite) ? "overwrite" : "update");
-#endif
-
if (NULL == (chunk_entry->buf = H5MM_malloc(buf_size)))
HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate chunk data buffer")
@@ -3215,6 +2944,7 @@ H5D__filtered_collective_chunk_entry_io(H5D_filtered_collective_io_info_t *chunk
* read from the file and unfiltered.
*/
if (!chunk_entry->full_overwrite || io_info->op_type == H5D_IO_OP_READ) {
+ /* XXX: Test with MPI types and collective read to improve performance */
if (H5F_block_read(io_info->dset->oloc.file, H5FD_MEM_DRAW, chunk_entry->old_chunk.offset,
buf_size, H5AC_rawdata_dxpl_id, chunk_entry->buf) < 0)
HGOTO_ERROR(H5E_IO, H5E_READERROR, FAIL, "unable to read raw data chunk")
@@ -3223,19 +2953,6 @@ H5D__filtered_collective_chunk_entry_io(H5D_filtered_collective_io_info_t *chunk
io_info->dxpl_cache->err_detect, io_info->dxpl_cache->filter_cb,
(size_t *) &chunk_entry->new_chunk.length, &buf_size, &chunk_entry->buf) < 0)
HGOTO_ERROR(H5E_PLINE, H5E_CANTFILTER, FAIL, "couldn't unfilter chunk for modifying")
-
-#ifdef PARALLEL_COMPRESS_DEBUG
- HDfprintf(debug_file, "| - Read chunk from file.\n");
-
- HDfprintf(debug_file, "| - After decompression: Nbytes=%zd; Buf_size=%zd.\n", chunk_entry->new_chunk.length, buf_size);
-
- HDfprintf(debug_file, "| - Read buf:\n| - [");
- for (size_t j = 0; j < chunk_entry->new_chunk.length / type_info->src_type_size; j++) {
- if (j > 0) HDfprintf(debug_file, ", ");
- HDfprintf(debug_file, "%lld", ((long *) chunk_entry->buf)[j]);
- }
- HDfprintf(debug_file, "]\n|\n");
-#endif
} /* end if */
/* Initialize iterator for memory selection */
@@ -3307,52 +3024,26 @@ H5D__filtered_collective_chunk_entry_io(H5D_filtered_collective_io_info_t *chunk
io_info->comm, &status)))
HMPI_GOTO_ERROR(FAIL, "MPI_Probe failed", mpi_code)
-#ifdef PARALLEL_COMPRESS_DEBUG
- HDfprintf(debug_file, "| - Found message from source %d with tag %d.\n", status.MPI_SOURCE, status.MPI_TAG);
-#endif
-
/* Retrieve the message size */
if (MPI_SUCCESS != (mpi_code = MPI_Get_count(&status, MPI_BYTE, &count)))
HMPI_GOTO_ERROR(FAIL, "MPI_Get_count failed", mpi_code)
-#ifdef PARALLEL_COMPRESS_DEBUG
- HDfprintf(debug_file, "| - Message size is %d bytes.\n", count);
-#endif
-
if ((size_t) count > mod_data_alloced_bytes) {
if (NULL == (mod_data = (unsigned char *) H5MM_realloc(mod_data, (size_t) count)))
HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate modification data receive buffer")
mod_data_alloced_bytes = (size_t) count;
-
-#ifdef PARALLEL_COMPRESS_DEBUG
- HDfprintf(debug_file, "| - Re-alloced buffer.\n");
- HDfprintf(debug_file, "| - New size is: %zu.\n", mod_data_alloced_bytes);
-#endif
}
if (MPI_SUCCESS != (mpi_code = MPI_Recv(mod_data, count, MPI_BYTE, MPI_ANY_SOURCE,
(int) chunk_entry->chunk_info.index, io_info->comm, &status)))
HMPI_GOTO_ERROR(FAIL, "MPI_Recv failed", mpi_code)
-#ifdef PARALLEL_COMPRESS_DEBUG
- HDfprintf(debug_file, "| - Received the message.\n");
-#endif
-
/* Decode the process' chunk file dataspace */
mod_data_p = mod_data;
if (NULL == (dataspace = H5S_decode(&mod_data_p)))
HGOTO_ERROR(H5E_DATASET, H5E_CANTDECODE, FAIL, "unable to decode dataspace")
-#ifdef PARALLEL_COMPRESS_DEBUG
- HDfprintf(debug_file, "| Deserialized selection info:\n");
- HDfprintf(debug_file, "| Mem Space:\n");
- HDfprintf(debug_file, "| - Extent Num elements: %zd\n", H5S_GET_EXTENT_NPOINTS(dataspace));
- HDfprintf(debug_file, "| - Extent type: %d\n", H5S_GET_EXTENT_TYPE(dataspace));
- HDfprintf(debug_file, "| - Selection Num elements: %zd\n", H5S_GET_SELECT_NPOINTS(dataspace));
- HDfprintf(debug_file, "| - Selection type: %d\n", H5S_GET_SELECT_TYPE(dataspace));
-#endif
-
if (H5S_select_iter_init(mem_iter, dataspace, type_info->dst_type_size) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_CANTINIT, FAIL, "unable to initialize memory selection information")
mem_iter_init = TRUE;
@@ -3360,20 +3051,6 @@ H5D__filtered_collective_chunk_entry_io(H5D_filtered_collective_io_info_t *chunk
if ((iter_nelmts = H5S_GET_SELECT_NPOINTS(dataspace)) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_CANTCOUNT, FAIL, "dataspace is invalid")
-#ifdef PARALLEL_COMPRESS_DEBUG
- HDfprintf(debug_file, "| Contents of message:\n| [");
- for (size_t j = 0; j < (size_t) iter_nelmts; j++) {
- if (j > 0) HDfprintf(debug_file, ", ");
- HDfprintf(debug_file, "%lld", ((const long *) mod_data_p)[j]);
- }
- HDfprintf(debug_file, "]\n");
-#endif
-
-#ifdef PARALLEL_COMPRESS_DEBUG
- HDfprintf(debug_file, "| Iter nelmts=%lld.\n", iter_nelmts);
- HDfprintf(debug_file, "| Mem space selected points: %zd.\n| \n", H5S_GET_SELECT_NPOINTS(dataspace));
-#endif
-
/* Update the chunk data with the received modification data */
if (H5D__scatter_mem(mod_data_p, dataspace, mem_iter, (size_t) iter_nelmts,
io_info->dxpl_cache, chunk_entry->buf) < 0)
@@ -3391,19 +3068,6 @@ H5D__filtered_collective_chunk_entry_io(H5D_filtered_collective_io_info_t *chunk
}
} /* end while */
-
-#ifdef PARALLEL_COMPRESS_DEBUG
- HDfprintf(debug_file, "| - Chunk Data Buffer:\n");
- HDfprintf(debug_file, "| - [");
- for (size_t j = 0; j < chunk_entry->new_chunk.length / type_info->src_type_size; j++) {
- if (j > 0) HDfprintf(debug_file, ", ");
- HDfprintf(debug_file, "%lld", ((long *) chunk_entry->buf)[j]);
- }
- HDfprintf(debug_file, "]\n|\n");
-
- HDfprintf(debug_file, "| - About to filter %zd bytes in buffer of size %zd.\n|\n", chunk_entry->new_chunk.length, buf_size);
-#endif
-
/* Filter the chunk */
if (H5Z_pipeline(&io_info->dset->shared->dcpl_cache.pline, 0, &filter_mask,
io_info->dxpl_cache->err_detect, io_info->dxpl_cache->filter_cb,