diff options
Diffstat (limited to 'src/H5Dmpio.c')
-rw-r--r-- | src/H5Dmpio.c | 96 |
1 files changed, 60 insertions, 36 deletions
diff --git a/src/H5Dmpio.c b/src/H5Dmpio.c index 9239b21..60865c2 100644 --- a/src/H5Dmpio.c +++ b/src/H5Dmpio.c @@ -106,6 +106,7 @@ typedef struct H5D_filtered_collective_io_info_t { H5D_chunk_info_t chunk_info; H5F_block_t old_chunk; H5F_block_t new_chunk; + hbool_t full_overwrite; size_t io_size; size_t num_writers; int owner; @@ -366,7 +367,6 @@ H5D__mpio_array_gather(const H5D_io_info_t *io_info, void *local_array, FUNC_ENTER_STATIC HDassert(io_info); - HDassert(local_array); HDassert(_gathered_array); HDassert(_gathered_array_num_entries); @@ -414,8 +414,8 @@ H5D__mpio_array_gather(const H5D_io_info_t *io_info, void *local_array, HDfprintf(debug_file, "------------------------------\n"); for (size_t j = 0; j < (size_t) gathered_array_num_entries; j++) { HDfprintf(debug_file, "| Chunk Entry %zd:\n", j); - HDfprintf(debug_file, "| - Chunk Address: %a\n", ((H5D_filtered_collective_io_info_t *) gathered_array)[j].old_chunk.offset); - HDfprintf(debug_file, "| - Chunk Length: %zd\n", ((H5D_filtered_collective_io_info_t *) gathered_array)[j].old_chunk.length); + HDfprintf(debug_file, "| - Chunk Address: %a\n", ((H5D_filtered_collective_io_info_t *) gathered_array)[j].new_chunk.offset); + HDfprintf(debug_file, "| - Chunk Length: %zd\n", ((H5D_filtered_collective_io_info_t *) gathered_array)[j].new_chunk.length); HDfprintf(debug_file, "| - Address of mspace: %x\n", ((H5D_filtered_collective_io_info_t *) gathered_array)[j].chunk_info.mspace); } HDfprintf(debug_file, "------------------------------\n\n"); @@ -2687,6 +2687,7 @@ H5D__construct_filtered_io_info_list(const H5D_io_info_t *io_info, const H5D_typ size_t num_send_requests = 0; size_t num_chunks_selected; size_t overlap_info_array_num_entries; + size_t i; int mpi_rank, mpi_size, mpi_code; herr_t ret_value = SUCCEED; @@ -2705,32 +2706,41 @@ H5D__construct_filtered_io_info_list(const H5D_io_info_t *io_info, const H5D_typ /* Get the no overlap property */ - - if (NULL == (local_info_array = (H5D_filtered_collective_io_info_t *) H5MM_malloc(H5SL_count(fm->sel_chunks) * sizeof(*local_info_array)))) - HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate local io info array buffer") - - chunk_node = H5SL_first(fm->sel_chunks); - for (num_chunks_selected = 0; chunk_node; num_chunks_selected++) { - H5D_chunk_info_t *chunk_info = (H5D_chunk_info_t *) H5SL_item(chunk_node); + if ((num_chunks_selected = H5SL_count(fm->sel_chunks))) { + H5D_chunk_info_t *chunk_info; H5D_chunk_ud_t udata; hssize_t select_npoints; + hssize_t chunk_npoints; + + if (NULL == (local_info_array = (H5D_filtered_collective_io_info_t *) H5MM_malloc(num_chunks_selected * sizeof(*local_info_array)))) + HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate local io info array buffer") - /* Obtain this chunk's address */ - if (H5D__chunk_lookup(io_info->dset, io_info->md_dxpl_id, chunk_info->scaled, &udata) < 0) - HGOTO_ERROR(H5E_DATASET, H5E_CANTGET, FAIL, "error looking up chunk address") + chunk_node = H5SL_first(fm->sel_chunks); + for (i = 0; chunk_node; i++) { + chunk_info = (H5D_chunk_info_t *) H5SL_item(chunk_node); - local_info_array[num_chunks_selected].chunk_info = *chunk_info; - local_info_array[num_chunks_selected].old_chunk = local_info_array[num_chunks_selected].new_chunk = udata.chunk_block; - local_info_array[num_chunks_selected].num_writers = 0; - local_info_array[num_chunks_selected].owner = mpi_rank; - local_info_array[num_chunks_selected].buf = NULL; + /* Obtain this chunk's address */ + if (H5D__chunk_lookup(io_info->dset, io_info->md_dxpl_id, chunk_info->scaled, &udata) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_CANTGET, FAIL, "error looking up chunk address") - if ((select_npoints = H5S_GET_SELECT_NPOINTS(chunk_info->mspace)) < 0) - HGOTO_ERROR(H5E_DATASET, H5E_CANTCOUNT, FAIL, "dataspace is invalid") - local_info_array[num_chunks_selected].io_size = (size_t) select_npoints * type_info->src_type_size; + local_info_array[i].chunk_info = *chunk_info; + local_info_array[i].old_chunk = local_info_array[i].new_chunk = udata.chunk_block; + local_info_array[i].num_writers = 0; + local_info_array[i].owner = mpi_rank; + local_info_array[i].buf = NULL; - chunk_node = H5SL_next(chunk_node); - } /* end for */ + if ((select_npoints = H5S_GET_SELECT_NPOINTS(chunk_info->mspace)) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_CANTCOUNT, FAIL, "dataspace is invalid") + local_info_array[i].io_size = (size_t) select_npoints * type_info->src_type_size; + + if ((chunk_npoints = H5S_GET_EXTENT_NPOINTS(chunk_info->fspace)) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_CANTCOUNT, FAIL, "dataspace is invalid") + local_info_array[i].full_overwrite = + (local_info_array[i].io_size == (hsize_t) chunk_npoints * type_info->dst_type_size) ? TRUE : FALSE; + + chunk_node = H5SL_next(chunk_node); + } /* end for */ + } /* end if */ #ifdef PARALLEL_COMPRESS_DEBUG HDfprintf(debug_file, " Contents of local info array\n"); @@ -2742,6 +2752,7 @@ H5D__construct_filtered_io_info_list(const H5D_io_info_t *io_info, const H5D_typ HDfprintf(debug_file, "| - Address of mspace: %x\n", local_info_array[j].chunk_info.mspace); HDfprintf(debug_file, "| - Chunk Selection Type: %d\n", H5S_GET_SELECT_TYPE(local_info_array[j].chunk_info.mspace)); HDfprintf(debug_file, "| - Chunk Num Elmts Sel.: %zd\n", H5S_GET_SELECT_NPOINTS(local_info_array[j].chunk_info.mspace)); + HDfprintf(debug_file, "| - Chunk write status: %s\n", (local_info_array[i].full_overwrite) ? "overwrite" : "update"); } HDfprintf(debug_file, "------------------------------\n\n"); @@ -2767,10 +2778,9 @@ H5D__construct_filtered_io_info_list(const H5D_io_info_t *io_info, const H5D_typ /* Redistribute shared chunks to new owners as necessary */ if (!no_overlap && (io_info->op_type == H5D_IO_OP_WRITE)) { - size_t i; - - if (NULL == (send_requests = (MPI_Request *) H5MM_malloc(num_chunks_selected * sizeof(*send_requests)))) - HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate send requests buffer") + if (num_chunks_selected) + if (NULL == (send_requests = (MPI_Request *) H5MM_malloc(num_chunks_selected * sizeof(*send_requests)))) + HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate send requests buffer") if (NULL == (mem_iter = (H5S_sel_iter_t *) H5MM_malloc(sizeof(H5S_sel_iter_t)))) HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate memory iterator") @@ -2784,6 +2794,7 @@ H5D__construct_filtered_io_info_list(const H5D_io_info_t *io_info, const H5D_typ for (i = 0, num_chunks_selected = 0, num_send_requests = 0; i < overlap_info_array_num_entries;) { H5D_filtered_collective_io_info_t chunk_entry; haddr_t chunk_addr = overlap_info_array[i].old_chunk.offset; + size_t total_io_size = 0; size_t num_writers = 0; size_t max_bytes = 0; int new_owner = 0; @@ -2796,6 +2807,7 @@ H5D__construct_filtered_io_info_list(const H5D_io_info_t *io_info, const H5D_typ * contributing to the chunk and so will not try to access an invalid * dataspace when processes are sending chunk data to new owners */ chunk_entry.chunk_info.fspace = NULL; + chunk_entry.io_size = 0; /* Process duplicate entries caused by another process writing * to the same chunk @@ -2820,12 +2832,28 @@ H5D__construct_filtered_io_info_list(const H5D_io_info_t *io_info, const H5D_typ num_writers++; } while (++i < overlap_info_array_num_entries && overlap_info_array[i].old_chunk.offset == chunk_addr); + /* Determine the total IO size to the chunk by all processes combined */ + if (MPI_SUCCESS != (mpi_code = MPI_Reduce(&chunk_entry.io_size, &total_io_size, + 1, MPI_UNSIGNED_LONG_LONG, MPI_SUM, new_owner, io_info->comm))) + HMPI_GOTO_ERROR(FAIL, "MPI_reduce failed", mpi_code) + if (mpi_rank == new_owner) { + hssize_t chunk_npoints; + /* Make sure the new owner will know how many other processes will * be sending chunk modification data to it */ chunk_entry.num_writers = num_writers; + /* Set the full chunk overwrite status. For simplicity, assume that this is + * a full overwrite of the chunk if the total IO size is equal to the size + * of the chunk and regard overlapping writes as an error. + */ + if ((chunk_npoints = H5S_GET_EXTENT_NPOINTS(chunk_entry.chunk_info.fspace)) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_CANTCOUNT, FAIL, "dataspace is invalid") + + chunk_entry.full_overwrite = (total_io_size == (hsize_t) chunk_npoints * type_info->dst_type_size) ? TRUE : FALSE; + /* New owner takes possession of the chunk */ overlap_info_array[num_chunks_selected++] = chunk_entry; } /* end if */ @@ -3091,7 +3119,6 @@ H5D__filtered_collective_chunk_entry_io(H5D_filtered_collective_io_info_t *chunk unsigned char *mod_data = NULL; /* Chunk modification data sent by a process to a chunk's owner */ unsigned filter_mask = 0; hssize_t iter_nelmts; /* Number of points to iterate over for the chunk IO operation */ - hbool_t full_overwrite = FALSE; /* Whether this is a full overwrite of this chunk */ hbool_t mem_iter_init = FALSE; size_t buf_size; H5S_t *dataspace = NULL; /* Other process' dataspace for the chunk */ @@ -3107,25 +3134,20 @@ H5D__filtered_collective_chunk_entry_io(H5D_filtered_collective_io_info_t *chunk HDfprintf(debug_file, "| Chunk at address %a:\n", chunk_entry->old_chunk.offset); #endif - /* XXX: Determine if a chunk is being fully overwritten by looking at the total selection - * in the dataspace. Possibly receive all data ahead of time so that the dataspaces can - * be ORed? - */ - /* If this is a read operation or a write operation where the chunk is not being fully * overwritten, enough memory must be allocated to read the filtered chunk from the file. * If this is a write operation where the chunk is being fully overwritten, enough memory * must be allocated for the size of the unfiltered chunk. */ /* XXX: Return value of macro should be checked */ - buf_size = (!full_overwrite || io_info->op_type == H5D_IO_OP_READ) ? chunk_entry->old_chunk.length + buf_size = (!chunk_entry->full_overwrite || io_info->op_type == H5D_IO_OP_READ) ? chunk_entry->old_chunk.length : (hsize_t) H5S_GET_EXTENT_NPOINTS(chunk_entry->chunk_info.fspace) * type_info->src_type_size; chunk_entry->new_chunk.length = buf_size; #ifdef PARALLEL_COMPRESS_DEBUG HDfprintf(debug_file, "| - Allocing %zd bytes for chunk data buffer.\n", buf_size); if (io_info->op_type == H5D_IO_OP_WRITE) - HDfprintf(debug_file, "| - Write type is: %s.\n", (full_overwrite) ? "overwrite" : "non-overwrite"); + HDfprintf(debug_file, "| - Write type is: %s.\n", (chunk_entry->full_overwrite) ? "overwrite" : "update"); #endif if (NULL == (chunk_entry->buf = H5MM_malloc(buf_size))) @@ -3134,7 +3156,7 @@ H5D__filtered_collective_chunk_entry_io(H5D_filtered_collective_io_info_t *chunk /* If this is not a full chunk overwrite or this is a read operation, the chunk must be * read from the file and unfiltered. */ - if (!full_overwrite || io_info->op_type == H5D_IO_OP_READ) { + if (!chunk_entry->full_overwrite || io_info->op_type == H5D_IO_OP_READ) { if (H5F_block_read(io_info->dset->oloc.file, H5FD_MEM_DRAW, chunk_entry->old_chunk.offset, buf_size, H5AC_rawdata_dxpl_id, chunk_entry->buf) < 0) HGOTO_ERROR(H5E_IO, H5E_READERROR, H5_ITER_ERROR, "unable to read raw data chunk") @@ -3145,6 +3167,8 @@ H5D__filtered_collective_chunk_entry_io(H5D_filtered_collective_io_info_t *chunk HGOTO_ERROR(H5E_PLINE, H5E_CANTFILTER, H5_ITER_ERROR, "couldn't unfilter chunk for modifying") #ifdef PARALLEL_COMPRESS_DEBUG + HDfprintf(debug_file, "| - Read chunk from file.\n"); + HDfprintf(debug_file, "| - After decompression: Nbytes=%zd; Buf_size=%zd.\n", chunk_entry->new_chunk.length, buf_size); HDfprintf(debug_file, "| - Read buf:\n| - ["); |