summaryrefslogtreecommitdiffstats
path: root/src/H5Dmpio.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/H5Dmpio.c')
-rw-r--r--src/H5Dmpio.c96
1 files changed, 60 insertions, 36 deletions
diff --git a/src/H5Dmpio.c b/src/H5Dmpio.c
index 9239b21..60865c2 100644
--- a/src/H5Dmpio.c
+++ b/src/H5Dmpio.c
@@ -106,6 +106,7 @@ typedef struct H5D_filtered_collective_io_info_t {
H5D_chunk_info_t chunk_info;
H5F_block_t old_chunk;
H5F_block_t new_chunk;
+ hbool_t full_overwrite;
size_t io_size;
size_t num_writers;
int owner;
@@ -366,7 +367,6 @@ H5D__mpio_array_gather(const H5D_io_info_t *io_info, void *local_array,
FUNC_ENTER_STATIC
HDassert(io_info);
- HDassert(local_array);
HDassert(_gathered_array);
HDassert(_gathered_array_num_entries);
@@ -414,8 +414,8 @@ H5D__mpio_array_gather(const H5D_io_info_t *io_info, void *local_array,
HDfprintf(debug_file, "------------------------------\n");
for (size_t j = 0; j < (size_t) gathered_array_num_entries; j++) {
HDfprintf(debug_file, "| Chunk Entry %zd:\n", j);
- HDfprintf(debug_file, "| - Chunk Address: %a\n", ((H5D_filtered_collective_io_info_t *) gathered_array)[j].old_chunk.offset);
- HDfprintf(debug_file, "| - Chunk Length: %zd\n", ((H5D_filtered_collective_io_info_t *) gathered_array)[j].old_chunk.length);
+ HDfprintf(debug_file, "| - Chunk Address: %a\n", ((H5D_filtered_collective_io_info_t *) gathered_array)[j].new_chunk.offset);
+ HDfprintf(debug_file, "| - Chunk Length: %zd\n", ((H5D_filtered_collective_io_info_t *) gathered_array)[j].new_chunk.length);
HDfprintf(debug_file, "| - Address of mspace: %x\n", ((H5D_filtered_collective_io_info_t *) gathered_array)[j].chunk_info.mspace);
}
HDfprintf(debug_file, "------------------------------\n\n");
@@ -2687,6 +2687,7 @@ H5D__construct_filtered_io_info_list(const H5D_io_info_t *io_info, const H5D_typ
size_t num_send_requests = 0;
size_t num_chunks_selected;
size_t overlap_info_array_num_entries;
+ size_t i;
int mpi_rank, mpi_size, mpi_code;
herr_t ret_value = SUCCEED;
@@ -2705,32 +2706,41 @@ H5D__construct_filtered_io_info_list(const H5D_io_info_t *io_info, const H5D_typ
/* Get the no overlap property */
-
- if (NULL == (local_info_array = (H5D_filtered_collective_io_info_t *) H5MM_malloc(H5SL_count(fm->sel_chunks) * sizeof(*local_info_array))))
- HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate local io info array buffer")
-
- chunk_node = H5SL_first(fm->sel_chunks);
- for (num_chunks_selected = 0; chunk_node; num_chunks_selected++) {
- H5D_chunk_info_t *chunk_info = (H5D_chunk_info_t *) H5SL_item(chunk_node);
+ if ((num_chunks_selected = H5SL_count(fm->sel_chunks))) {
+ H5D_chunk_info_t *chunk_info;
H5D_chunk_ud_t udata;
hssize_t select_npoints;
+ hssize_t chunk_npoints;
+
+ if (NULL == (local_info_array = (H5D_filtered_collective_io_info_t *) H5MM_malloc(num_chunks_selected * sizeof(*local_info_array))))
+ HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate local io info array buffer")
- /* Obtain this chunk's address */
- if (H5D__chunk_lookup(io_info->dset, io_info->md_dxpl_id, chunk_info->scaled, &udata) < 0)
- HGOTO_ERROR(H5E_DATASET, H5E_CANTGET, FAIL, "error looking up chunk address")
+ chunk_node = H5SL_first(fm->sel_chunks);
+ for (i = 0; chunk_node; i++) {
+ chunk_info = (H5D_chunk_info_t *) H5SL_item(chunk_node);
- local_info_array[num_chunks_selected].chunk_info = *chunk_info;
- local_info_array[num_chunks_selected].old_chunk = local_info_array[num_chunks_selected].new_chunk = udata.chunk_block;
- local_info_array[num_chunks_selected].num_writers = 0;
- local_info_array[num_chunks_selected].owner = mpi_rank;
- local_info_array[num_chunks_selected].buf = NULL;
+ /* Obtain this chunk's address */
+ if (H5D__chunk_lookup(io_info->dset, io_info->md_dxpl_id, chunk_info->scaled, &udata) < 0)
+ HGOTO_ERROR(H5E_DATASET, H5E_CANTGET, FAIL, "error looking up chunk address")
- if ((select_npoints = H5S_GET_SELECT_NPOINTS(chunk_info->mspace)) < 0)
- HGOTO_ERROR(H5E_DATASET, H5E_CANTCOUNT, FAIL, "dataspace is invalid")
- local_info_array[num_chunks_selected].io_size = (size_t) select_npoints * type_info->src_type_size;
+ local_info_array[i].chunk_info = *chunk_info;
+ local_info_array[i].old_chunk = local_info_array[i].new_chunk = udata.chunk_block;
+ local_info_array[i].num_writers = 0;
+ local_info_array[i].owner = mpi_rank;
+ local_info_array[i].buf = NULL;
- chunk_node = H5SL_next(chunk_node);
- } /* end for */
+ if ((select_npoints = H5S_GET_SELECT_NPOINTS(chunk_info->mspace)) < 0)
+ HGOTO_ERROR(H5E_DATASET, H5E_CANTCOUNT, FAIL, "dataspace is invalid")
+ local_info_array[i].io_size = (size_t) select_npoints * type_info->src_type_size;
+
+ if ((chunk_npoints = H5S_GET_EXTENT_NPOINTS(chunk_info->fspace)) < 0)
+ HGOTO_ERROR(H5E_DATASET, H5E_CANTCOUNT, FAIL, "dataspace is invalid")
+ local_info_array[i].full_overwrite =
+ (local_info_array[i].io_size == (hsize_t) chunk_npoints * type_info->dst_type_size) ? TRUE : FALSE;
+
+ chunk_node = H5SL_next(chunk_node);
+ } /* end for */
+ } /* end if */
#ifdef PARALLEL_COMPRESS_DEBUG
HDfprintf(debug_file, " Contents of local info array\n");
@@ -2742,6 +2752,7 @@ H5D__construct_filtered_io_info_list(const H5D_io_info_t *io_info, const H5D_typ
HDfprintf(debug_file, "| - Address of mspace: %x\n", local_info_array[j].chunk_info.mspace);
HDfprintf(debug_file, "| - Chunk Selection Type: %d\n", H5S_GET_SELECT_TYPE(local_info_array[j].chunk_info.mspace));
HDfprintf(debug_file, "| - Chunk Num Elmts Sel.: %zd\n", H5S_GET_SELECT_NPOINTS(local_info_array[j].chunk_info.mspace));
+ HDfprintf(debug_file, "| - Chunk write status: %s\n", (local_info_array[i].full_overwrite) ? "overwrite" : "update");
}
HDfprintf(debug_file, "------------------------------\n\n");
@@ -2767,10 +2778,9 @@ H5D__construct_filtered_io_info_list(const H5D_io_info_t *io_info, const H5D_typ
/* Redistribute shared chunks to new owners as necessary */
if (!no_overlap && (io_info->op_type == H5D_IO_OP_WRITE)) {
- size_t i;
-
- if (NULL == (send_requests = (MPI_Request *) H5MM_malloc(num_chunks_selected * sizeof(*send_requests))))
- HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate send requests buffer")
+ if (num_chunks_selected)
+ if (NULL == (send_requests = (MPI_Request *) H5MM_malloc(num_chunks_selected * sizeof(*send_requests))))
+ HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate send requests buffer")
if (NULL == (mem_iter = (H5S_sel_iter_t *) H5MM_malloc(sizeof(H5S_sel_iter_t))))
HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate memory iterator")
@@ -2784,6 +2794,7 @@ H5D__construct_filtered_io_info_list(const H5D_io_info_t *io_info, const H5D_typ
for (i = 0, num_chunks_selected = 0, num_send_requests = 0; i < overlap_info_array_num_entries;) {
H5D_filtered_collective_io_info_t chunk_entry;
haddr_t chunk_addr = overlap_info_array[i].old_chunk.offset;
+ size_t total_io_size = 0;
size_t num_writers = 0;
size_t max_bytes = 0;
int new_owner = 0;
@@ -2796,6 +2807,7 @@ H5D__construct_filtered_io_info_list(const H5D_io_info_t *io_info, const H5D_typ
* contributing to the chunk and so will not try to access an invalid
* dataspace when processes are sending chunk data to new owners */
chunk_entry.chunk_info.fspace = NULL;
+ chunk_entry.io_size = 0;
/* Process duplicate entries caused by another process writing
* to the same chunk
@@ -2820,12 +2832,28 @@ H5D__construct_filtered_io_info_list(const H5D_io_info_t *io_info, const H5D_typ
num_writers++;
} while (++i < overlap_info_array_num_entries && overlap_info_array[i].old_chunk.offset == chunk_addr);
+ /* Determine the total IO size to the chunk by all processes combined */
+ if (MPI_SUCCESS != (mpi_code = MPI_Reduce(&chunk_entry.io_size, &total_io_size,
+ 1, MPI_UNSIGNED_LONG_LONG, MPI_SUM, new_owner, io_info->comm)))
+ HMPI_GOTO_ERROR(FAIL, "MPI_reduce failed", mpi_code)
+
if (mpi_rank == new_owner) {
+ hssize_t chunk_npoints;
+
/* Make sure the new owner will know how many other processes will
* be sending chunk modification data to it
*/
chunk_entry.num_writers = num_writers;
+ /* Set the full chunk overwrite status. For simplicity, assume that this is
+ * a full overwrite of the chunk if the total IO size is equal to the size
+ * of the chunk and regard overlapping writes as an error.
+ */
+ if ((chunk_npoints = H5S_GET_EXTENT_NPOINTS(chunk_entry.chunk_info.fspace)) < 0)
+ HGOTO_ERROR(H5E_DATASET, H5E_CANTCOUNT, FAIL, "dataspace is invalid")
+
+ chunk_entry.full_overwrite = (total_io_size == (hsize_t) chunk_npoints * type_info->dst_type_size) ? TRUE : FALSE;
+
/* New owner takes possession of the chunk */
overlap_info_array[num_chunks_selected++] = chunk_entry;
} /* end if */
@@ -3091,7 +3119,6 @@ H5D__filtered_collective_chunk_entry_io(H5D_filtered_collective_io_info_t *chunk
unsigned char *mod_data = NULL; /* Chunk modification data sent by a process to a chunk's owner */
unsigned filter_mask = 0;
hssize_t iter_nelmts; /* Number of points to iterate over for the chunk IO operation */
- hbool_t full_overwrite = FALSE; /* Whether this is a full overwrite of this chunk */
hbool_t mem_iter_init = FALSE;
size_t buf_size;
H5S_t *dataspace = NULL; /* Other process' dataspace for the chunk */
@@ -3107,25 +3134,20 @@ H5D__filtered_collective_chunk_entry_io(H5D_filtered_collective_io_info_t *chunk
HDfprintf(debug_file, "| Chunk at address %a:\n", chunk_entry->old_chunk.offset);
#endif
- /* XXX: Determine if a chunk is being fully overwritten by looking at the total selection
- * in the dataspace. Possibly receive all data ahead of time so that the dataspaces can
- * be ORed?
- */
-
/* If this is a read operation or a write operation where the chunk is not being fully
* overwritten, enough memory must be allocated to read the filtered chunk from the file.
* If this is a write operation where the chunk is being fully overwritten, enough memory
* must be allocated for the size of the unfiltered chunk.
*/
/* XXX: Return value of macro should be checked */
- buf_size = (!full_overwrite || io_info->op_type == H5D_IO_OP_READ) ? chunk_entry->old_chunk.length
+ buf_size = (!chunk_entry->full_overwrite || io_info->op_type == H5D_IO_OP_READ) ? chunk_entry->old_chunk.length
: (hsize_t) H5S_GET_EXTENT_NPOINTS(chunk_entry->chunk_info.fspace) * type_info->src_type_size;
chunk_entry->new_chunk.length = buf_size;
#ifdef PARALLEL_COMPRESS_DEBUG
HDfprintf(debug_file, "| - Allocing %zd bytes for chunk data buffer.\n", buf_size);
if (io_info->op_type == H5D_IO_OP_WRITE)
- HDfprintf(debug_file, "| - Write type is: %s.\n", (full_overwrite) ? "overwrite" : "non-overwrite");
+ HDfprintf(debug_file, "| - Write type is: %s.\n", (chunk_entry->full_overwrite) ? "overwrite" : "update");
#endif
if (NULL == (chunk_entry->buf = H5MM_malloc(buf_size)))
@@ -3134,7 +3156,7 @@ H5D__filtered_collective_chunk_entry_io(H5D_filtered_collective_io_info_t *chunk
/* If this is not a full chunk overwrite or this is a read operation, the chunk must be
* read from the file and unfiltered.
*/
- if (!full_overwrite || io_info->op_type == H5D_IO_OP_READ) {
+ if (!chunk_entry->full_overwrite || io_info->op_type == H5D_IO_OP_READ) {
if (H5F_block_read(io_info->dset->oloc.file, H5FD_MEM_DRAW, chunk_entry->old_chunk.offset,
buf_size, H5AC_rawdata_dxpl_id, chunk_entry->buf) < 0)
HGOTO_ERROR(H5E_IO, H5E_READERROR, H5_ITER_ERROR, "unable to read raw data chunk")
@@ -3145,6 +3167,8 @@ H5D__filtered_collective_chunk_entry_io(H5D_filtered_collective_io_info_t *chunk
HGOTO_ERROR(H5E_PLINE, H5E_CANTFILTER, H5_ITER_ERROR, "couldn't unfilter chunk for modifying")
#ifdef PARALLEL_COMPRESS_DEBUG
+ HDfprintf(debug_file, "| - Read chunk from file.\n");
+
HDfprintf(debug_file, "| - After decompression: Nbytes=%zd; Buf_size=%zd.\n", chunk_entry->new_chunk.length, buf_size);
HDfprintf(debug_file, "| - Read buf:\n| - [");