summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorjhendersonHDF <jhenderson@hdfgroup.org>2023-08-02 20:13:34 (GMT)
committerGitHub <noreply@github.com>2023-08-02 20:13:34 (GMT)
commit3fc5e34d71c9300ec10e87f2534bccec8df85756 (patch)
treec787fa4e198ac69084e0ad0b1d2b100eed06b35d
parent179b4a0d45cdafcc385c7edec81a70633431893d (diff)
downloadhdf5-3fc5e34d71c9300ec10e87f2534bccec8df85756.zip
hdf5-3fc5e34d71c9300ec10e87f2534bccec8df85756.tar.gz
hdf5-3fc5e34d71c9300ec10e87f2534bccec8df85756.tar.bz2
Switch parallel compression to use vector I/O (#3245) (#3327)
Updates parallel compression feature to use vector I/O instead of creating and passing down MPI derived types to VFD
-rw-r--r--src/H5Dmpio.c1340
-rw-r--r--src/H5Fio.c20
2 files changed, 544 insertions, 816 deletions
diff --git a/src/H5Dmpio.c b/src/H5Dmpio.c
index 812e748..be76c42 100644
--- a/src/H5Dmpio.c
+++ b/src/H5Dmpio.c
@@ -213,7 +213,7 @@ typedef struct H5D_chunk_index_info_t {
* hh - A handle for hash tables provided by the uthash.h header
*
*/
-typedef struct H5D_filtered_collective_io_info_t {
+typedef struct H5D_filtered_collective_chunk_info_t {
H5D_chunk_index_info_t index_info;
H5D_piece_info_t *chunk_info;
@@ -229,6 +229,46 @@ typedef struct H5D_filtered_collective_io_info_t {
void *buf;
UT_hash_handle hh;
+} H5D_filtered_collective_chunk_info_t;
+
+/*
+ * Top-level structure that contains an array of H5D_filtered_collective_chunk_info_t
+ * chunk info structures for collective filtered I/O, as well as other useful information.
+ * The struct's fields are as follows:
+ *
+ * chunk_infos - An array of H5D_filtered_collective_chunk_info_t structures that each
+ * contain information about a single chunk when performing collective filtered
+ * I/O.
+ *
+ * chunk_hash_table - A hash table storing H5D_filtered_collective_chunk_info_t structures
+ * that is populated when chunk modification data has to be shared between
+ * MPI processes during collective filtered I/O. This hash table facilitates
+ * quicker and easier lookup of a particular chunk by its "chunk index"
+ * value when applying chunk data modification messages from another MPI
+ * process. Each modification message received from another MPI process
+ * will contain the chunk's "chunk index" value that can be used for chunk
+ * lookup operations.
+ *
+ * num_chunks_infos - The number of entries in the `chunk_infos` array.
+ *
+ * num_chunks_to_read - The number of entries (or chunks) in the `chunk_infos` array that
+ * will need to be read in the case of a read operation (which can
+ * occur during dataset reads or during dataset writes when a chunk
+ * has to go through a read - modify - write cycle). The value for
+ * this field is based on a chunk's `need_read` field in its particular
+ * H5D_filtered_collective_chunk_info_t structure, but may be adjusted
+ * later depending on file space allocation timing and other factors.
+ *
+ * This field can be helpful to avoid needing to scan through the list
+ * of chunk info structures to determine how big of I/O vectors to
+ * allocate during read operations, as an example.
+ *
+ */
+typedef struct H5D_filtered_collective_io_info_t {
+ H5D_filtered_collective_chunk_info_t *chunk_infos;
+ H5D_filtered_collective_chunk_info_t *chunk_hash_table;
+ size_t num_chunk_infos;
+ size_t num_chunks_to_read;
} H5D_filtered_collective_io_info_t;
/*
@@ -273,12 +313,10 @@ static herr_t H5D__obtain_mpio_mode(H5D_io_info_t *io_info, H5D_dset_io_info_t *
static herr_t H5D__mpio_get_sum_chunk(const H5D_io_info_t *io_info, int *sum_chunkf);
static herr_t H5D__mpio_get_sum_chunk_dset(const H5D_io_info_t *io_info, const H5D_dset_io_info_t *dset_info,
int *sum_chunkf);
-static herr_t H5D__mpio_collective_filtered_chunk_io_setup(const H5D_io_info_t *io_info,
- const H5D_dset_io_info_t *di,
- H5D_filtered_collective_io_info_t **chunk_list,
- size_t *num_entries, int mpi_rank);
+static herr_t H5D__mpio_collective_filtered_chunk_io_setup(const H5D_io_info_t *io_info,
+ const H5D_dset_io_info_t *di, int mpi_rank,
+ H5D_filtered_collective_io_info_t *chunk_list);
static herr_t H5D__mpio_redistribute_shared_chunks(H5D_filtered_collective_io_info_t *chunk_list,
- size_t chunk_list_num_entries,
const H5D_io_info_t *io_info, int mpi_rank, int mpi_size,
size_t **rank_chunks_assigned_map);
static herr_t H5D__mpio_redistribute_shared_chunks_int(H5D_filtered_collective_io_info_t *chunk_list,
@@ -287,35 +325,24 @@ static herr_t H5D__mpio_redistribute_shared_chunks_int(H5D_filtered_collective_i
const H5D_io_info_t *io_info, int mpi_rank,
int mpi_size);
static herr_t H5D__mpio_share_chunk_modification_data(H5D_filtered_collective_io_info_t *chunk_list,
- size_t *chunk_list_num_entries, H5D_io_info_t *io_info,
- H5D_dset_io_info_t *dset_info, int mpi_rank,
- int mpi_size,
- H5D_filtered_collective_io_info_t **chunk_hash_table,
- unsigned char ***chunk_msg_bufs,
- int *chunk_msg_bufs_len);
-static herr_t H5D__mpio_collective_filtered_chunk_common_io(H5D_filtered_collective_io_info_t *chunk_list,
- size_t chunk_list_num_entries,
- const H5D_io_info_t *io_info, int mpi_size);
+ H5D_io_info_t *io_info, H5D_dset_io_info_t *dset_info,
+ int mpi_rank, int H5_ATTR_NDEBUG_UNUSED mpi_size,
+ unsigned char ***chunk_msg_bufs,
+ int *chunk_msg_bufs_len);
static herr_t H5D__mpio_collective_filtered_chunk_read(H5D_filtered_collective_io_info_t *chunk_list,
- size_t chunk_list_num_entries,
- const H5D_io_info_t *io_info,
- const H5D_dset_io_info_t *di, int mpi_rank,
- int mpi_size);
+ const H5D_io_info_t *io_info,
+ const H5D_dset_io_info_t *di, int mpi_rank);
static herr_t H5D__mpio_collective_filtered_chunk_update(H5D_filtered_collective_io_info_t *chunk_list,
- size_t chunk_list_num_entries,
- H5D_filtered_collective_io_info_t *chunk_hash_table,
unsigned char **chunk_msg_bufs,
int chunk_msg_bufs_len, const H5D_io_info_t *io_info,
- const H5D_dset_io_info_t *di, int mpi_rank,
- int mpi_size);
+ const H5D_dset_io_info_t *di,
+ int H5_ATTR_NDEBUG_UNUSED mpi_rank);
static herr_t H5D__mpio_collective_filtered_chunk_reallocate(H5D_filtered_collective_io_info_t *chunk_list,
- size_t chunk_list_num_entries,
size_t *num_chunks_assigned_map,
H5D_io_info_t *io_info,
H5D_chk_idx_info_t *idx_info, int mpi_rank,
int mpi_size);
static herr_t H5D__mpio_collective_filtered_chunk_reinsert(H5D_filtered_collective_io_info_t *chunk_list,
- size_t chunk_list_num_entries,
size_t *num_chunks_assigned_map,
H5D_io_info_t *io_info, H5D_dset_io_info_t *di,
H5D_chk_idx_info_t *idx_info, int mpi_rank,
@@ -329,10 +356,8 @@ static herr_t H5D__mpio_get_chunk_alloc_info_types(MPI_Datatype *contig_type, hb
static herr_t H5D__mpio_get_chunk_insert_info_types(MPI_Datatype *contig_type, hbool_t *contig_type_derived,
MPI_Datatype *resized_type,
hbool_t *resized_type_derived);
-static herr_t H5D__mpio_collective_filtered_io_type(H5D_filtered_collective_io_info_t *chunk_list,
- size_t num_entries, H5D_io_op_type_t op_type,
- MPI_Datatype *new_mem_type, hbool_t *mem_type_derived,
- MPI_Datatype *new_file_type, hbool_t *file_type_derived);
+static herr_t H5D__mpio_collective_filtered_vec_io(const H5D_filtered_collective_io_info_t *chunk_list,
+ H5F_shared_t *f_sh, H5D_io_op_type_t op_type);
static int H5D__cmp_piece_addr(const void *chunk_addr_info1, const void *chunk_addr_info2);
static int H5D__cmp_filtered_collective_io_info_entry(const void *filtered_collective_io_info_entry1,
const void *filtered_collective_io_info_entry2);
@@ -342,7 +367,7 @@ static int H5D__cmp_chunk_redistribute_info_orig_owner(const void *entry1, co
#ifdef H5Dmpio_DEBUG
static herr_t H5D__mpio_debug_init(void);
static herr_t H5D__mpio_dump_collective_filtered_chunk_list(H5D_filtered_collective_io_info_t *chunk_list,
- size_t chunk_list_num_entries, int mpi_rank);
+ int mpi_rank);
#endif
/*********************/
@@ -1730,7 +1755,7 @@ done:
*
* TODO: Note that steps D. and F. here are both collective
* operations that partially share data from the
- * H5D_filtered_collective_io_info_t structure. To
+ * H5D_filtered_collective_chunk_info_t structure. To
* try to conserve on memory a bit, the distributed
* arrays these operations create are discarded after
* each operation is performed. If memory consumption
@@ -1748,20 +1773,11 @@ static herr_t
H5D__link_chunk_filtered_collective_io(H5D_io_info_t *io_info, H5D_dset_io_info_t *dset_info, int mpi_rank,
int mpi_size)
{
- H5D_filtered_collective_io_info_t *chunk_list = NULL; /* The list of chunks being read/written */
- H5D_filtered_collective_io_info_t *chunk_hash_table = NULL;
- unsigned char **chunk_msg_bufs = NULL;
- MPI_Datatype mem_type = MPI_BYTE;
- MPI_Datatype file_type = MPI_BYTE;
- hbool_t mem_type_is_derived = FALSE;
- hbool_t file_type_is_derived = FALSE;
- size_t *rank_chunks_assigned_map = NULL;
- size_t chunk_list_num_entries;
- size_t i;
- int chunk_msg_bufs_len = 0;
- char fake_buf; /* Used as a fake buffer for ranks with no chunks, thus a NULL buf pointer */
- int mpi_code;
- herr_t ret_value = SUCCEED;
+ H5D_filtered_collective_io_info_t chunk_list = {0};
+ unsigned char **chunk_msg_bufs = NULL;
+ size_t *rank_chunks_assigned_map = NULL;
+ int chunk_msg_bufs_len = 0;
+ herr_t ret_value = SUCCEED;
FUNC_ENTER_PACKAGE_TAG(dset_info->dset->oloc.addr)
@@ -1783,36 +1799,32 @@ H5D__link_chunk_filtered_collective_io(H5D_io_info_t *io_info, H5D_dset_io_info_
H5CX_set_mpio_actual_io_mode(H5D_MPIO_CHUNK_COLLECTIVE);
/* Build a list of selected chunks in the collective io operation */
- if (H5D__mpio_collective_filtered_chunk_io_setup(io_info, dset_info, &chunk_list, &chunk_list_num_entries,
- mpi_rank) < 0)
+ if (H5D__mpio_collective_filtered_chunk_io_setup(io_info, dset_info, mpi_rank, &chunk_list) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_CANTINIT, FAIL, "couldn't construct filtered I/O info list")
if (io_info->op_type == H5D_IO_OP_READ) { /* Filtered collective read */
- if (H5D__mpio_collective_filtered_chunk_read(chunk_list, chunk_list_num_entries, io_info, dset_info,
- mpi_rank, mpi_size) < 0)
+ if (H5D__mpio_collective_filtered_chunk_read(&chunk_list, io_info, dset_info, mpi_rank) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_READERROR, FAIL, "couldn't read filtered chunks")
}
else { /* Filtered collective write */
H5D_chk_idx_info_t index_info;
- hsize_t mpi_buf_count;
H5D_MPIO_INIT_CHUNK_IDX_INFO(index_info, dset_info->dset);
if (mpi_size > 1) {
/* Redistribute shared chunks being written to */
- if (H5D__mpio_redistribute_shared_chunks(chunk_list, chunk_list_num_entries, io_info, mpi_rank,
- mpi_size, &rank_chunks_assigned_map) < 0)
+ if (H5D__mpio_redistribute_shared_chunks(&chunk_list, io_info, mpi_rank, mpi_size,
+ &rank_chunks_assigned_map) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_WRITEERROR, FAIL, "unable to redistribute shared chunks")
/* Send any chunk modification messages for chunks this rank no longer owns */
- if (H5D__mpio_share_chunk_modification_data(chunk_list, &chunk_list_num_entries, io_info,
- dset_info, mpi_rank, mpi_size, &chunk_hash_table,
+ if (H5D__mpio_share_chunk_modification_data(&chunk_list, io_info, dset_info, mpi_rank, mpi_size,
&chunk_msg_bufs, &chunk_msg_bufs_len) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_WRITEERROR, FAIL,
"unable to send chunk modification data between MPI ranks")
/* Make sure the local chunk list was updated correctly */
- assert(chunk_list_num_entries == rank_chunks_assigned_map[mpi_rank]);
+ assert(chunk_list.num_chunk_infos == rank_chunks_assigned_map[mpi_rank]);
}
/* Proceed to update all the chunks this rank owns with its own
@@ -1820,97 +1832,57 @@ H5D__link_chunk_filtered_collective_io(H5D_io_info_t *io_info, H5D_dset_io_info_
* the chunks. As chunk reads are done collectively here, all ranks
* must participate.
*/
- if (H5D__mpio_collective_filtered_chunk_update(chunk_list, chunk_list_num_entries, chunk_hash_table,
- chunk_msg_bufs, chunk_msg_bufs_len, io_info, dset_info,
- mpi_rank, mpi_size) < 0)
+ if (H5D__mpio_collective_filtered_chunk_update(&chunk_list, chunk_msg_bufs, chunk_msg_bufs_len,
+ io_info, dset_info, mpi_rank) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_WRITEERROR, FAIL, "couldn't update modified chunks")
/* Free up resources used by chunk hash table now that we're done updating chunks */
- HASH_CLEAR(hh, chunk_hash_table);
+ HASH_CLEAR(hh, chunk_list.chunk_hash_table);
/* All ranks now collectively re-allocate file space for all chunks */
- if (H5D__mpio_collective_filtered_chunk_reallocate(chunk_list, chunk_list_num_entries,
- rank_chunks_assigned_map, io_info, &index_info,
- mpi_rank, mpi_size) < 0)
+ if (H5D__mpio_collective_filtered_chunk_reallocate(&chunk_list, rank_chunks_assigned_map, io_info,
+ &index_info, mpi_rank, mpi_size) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_WRITEERROR, FAIL,
"couldn't collectively re-allocate file space for chunks")
- /* If this rank has any chunks selected, create a MPI type for collectively
- * writing out the chunks to file. Otherwise, the rank contributes to the
- * collective write with a none type.
- */
- if (H5D__mpio_collective_filtered_io_type(chunk_list, chunk_list_num_entries, io_info->op_type,
- &mem_type, &mem_type_is_derived, &file_type,
- &file_type_is_derived) < 0)
- HGOTO_ERROR(H5E_DATASET, H5E_CANTGET, FAIL,
- "couldn't create MPI type for writing filtered chunks")
-
- mpi_buf_count = (file_type_is_derived || mem_type_is_derived) ? 1 : 0;
-
- /* Setup contig storage info for I/O operation */
- if (chunk_list_num_entries) {
- /*
- * Override the write buffer to point to the first
- * chunk's data buffer
- */
- io_info->base_maddr.cvp = chunk_list[0].buf;
-
- /*
- * Setup the base storage address for this operation
- * to be the first chunk's file address
- */
- io_info->store_faddr = chunk_list[0].chunk_new.offset;
- }
- else {
- io_info->base_maddr.cvp = &fake_buf;
- io_info->store_faddr = 0;
- }
-
- /* Perform I/O */
- if (H5D__final_collective_io(io_info, mpi_buf_count, file_type, mem_type) < 0)
- HGOTO_ERROR(H5E_IO, H5E_CANTGET, FAIL, "couldn't finish MPI-IO")
+ /* Perform vector I/O on chunks */
+ if (H5D__mpio_collective_filtered_vec_io(&chunk_list, io_info->f_sh, io_info->op_type) < 0)
+ HGOTO_ERROR(H5E_DATASET, H5E_WRITEERROR, FAIL, "couldn't perform vector I/O on filtered chunks")
/* Free up resources in anticipation of following collective operation */
- for (i = 0; i < chunk_list_num_entries; i++) {
- if (chunk_list[i].buf) {
- H5MM_free(chunk_list[i].buf);
- chunk_list[i].buf = NULL;
+ for (size_t i = 0; i < chunk_list.num_chunk_infos; i++) {
+ if (chunk_list.chunk_infos[i].buf) {
+ H5MM_free(chunk_list.chunk_infos[i].buf);
+ chunk_list.chunk_infos[i].buf = NULL;
}
}
/* Participate in the collective re-insertion of all chunks modified
* into the chunk index
*/
- if (H5D__mpio_collective_filtered_chunk_reinsert(chunk_list, chunk_list_num_entries,
- rank_chunks_assigned_map, io_info, dset_info,
- &index_info, mpi_rank, mpi_size) < 0)
+ if (H5D__mpio_collective_filtered_chunk_reinsert(&chunk_list, rank_chunks_assigned_map, io_info,
+ dset_info, &index_info, mpi_rank, mpi_size) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_WRITEERROR, FAIL,
"couldn't collectively re-insert modified chunks into chunk index")
}
done:
- /* Free the MPI buf and file types, if they were derived */
- if (mem_type_is_derived && MPI_SUCCESS != (mpi_code = MPI_Type_free(&mem_type)))
- HMPI_DONE_ERROR(FAIL, "MPI_Type_free failed", mpi_code)
- if (file_type_is_derived && MPI_SUCCESS != (mpi_code = MPI_Type_free(&file_type)))
- HMPI_DONE_ERROR(FAIL, "MPI_Type_free failed", mpi_code)
-
if (chunk_msg_bufs) {
- for (i = 0; i < (size_t)chunk_msg_bufs_len; i++)
+ for (size_t i = 0; i < (size_t)chunk_msg_bufs_len; i++)
H5MM_free(chunk_msg_bufs[i]);
H5MM_free(chunk_msg_bufs);
}
- HASH_CLEAR(hh, chunk_hash_table);
+ HASH_CLEAR(hh, chunk_list.chunk_hash_table);
/* Free resources used by a rank which had some selection */
- if (chunk_list) {
- for (i = 0; i < chunk_list_num_entries; i++)
- if (chunk_list[i].buf)
- H5MM_free(chunk_list[i].buf);
+ if (chunk_list.chunk_infos) {
+ for (size_t i = 0; i < chunk_list.num_chunk_infos; i++)
+ if (chunk_list.chunk_infos[i].buf)
+ H5MM_free(chunk_list.chunk_infos[i].buf);
- H5MM_free(chunk_list);
+ H5MM_free(chunk_list.chunk_infos);
} /* end if */
if (rank_chunks_assigned_map)
@@ -2202,7 +2174,7 @@ done:
*
* TODO: Note that steps E. and G. here are both collective
* operations that partially share data from the
- * H5D_filtered_collective_io_info_t structure. To
+ * H5D_filtered_collective_chunk_info_t structure. To
* try to conserve on memory a bit, the distributed
* arrays these operations create are discarded after
* each operation is performed. If memory consumption
@@ -2220,21 +2192,13 @@ static herr_t
H5D__multi_chunk_filtered_collective_io(H5D_io_info_t *io_info, H5D_dset_io_info_t *dset_info, int mpi_rank,
int mpi_size)
{
- H5D_filtered_collective_io_info_t *chunk_list = NULL; /* The list of chunks being read/written */
- H5D_filtered_collective_io_info_t *chunk_hash_table = NULL;
- unsigned char **chunk_msg_bufs = NULL;
- H5D_io_info_t ctg_io_info; /* Contiguous I/O info object */
- MPI_Datatype mem_type = MPI_BYTE;
- MPI_Datatype file_type = MPI_BYTE;
- hbool_t mem_type_is_derived = FALSE;
- hbool_t file_type_is_derived = FALSE;
- hbool_t have_chunk_to_process;
- size_t chunk_list_num_entries;
- size_t i;
- size_t max_num_chunks;
- int chunk_msg_bufs_len = 0;
- int mpi_code;
- herr_t ret_value = SUCCEED;
+ H5D_filtered_collective_io_info_t chunk_list = {0};
+ unsigned char **chunk_msg_bufs = NULL;
+ hbool_t have_chunk_to_process;
+ size_t max_num_chunks;
+ int chunk_msg_bufs_len = 0;
+ int mpi_code;
+ herr_t ret_value = SUCCEED;
FUNC_ENTER_PACKAGE_TAG(dset_info->dset->oloc.addr)
@@ -2256,12 +2220,11 @@ H5D__multi_chunk_filtered_collective_io(H5D_io_info_t *io_info, H5D_dset_io_info
H5CX_set_mpio_actual_io_mode(H5D_MPIO_CHUNK_COLLECTIVE);
/* Build a list of selected chunks in the collective IO operation */
- if (H5D__mpio_collective_filtered_chunk_io_setup(io_info, dset_info, &chunk_list, &chunk_list_num_entries,
- mpi_rank) < 0)
+ if (H5D__mpio_collective_filtered_chunk_io_setup(io_info, dset_info, mpi_rank, &chunk_list) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_CANTINIT, FAIL, "couldn't construct filtered I/O info list")
/* Retrieve the maximum number of chunks selected for any rank */
- if (MPI_SUCCESS != (mpi_code = MPI_Allreduce(&chunk_list_num_entries, &max_num_chunks, 1,
+ if (MPI_SUCCESS != (mpi_code = MPI_Allreduce(&chunk_list.num_chunk_infos, &max_num_chunks, 1,
MPI_UNSIGNED_LONG_LONG, MPI_MAX, io_info->comm)))
HMPI_GOTO_ERROR(FAIL, "MPI_Allreduce failed", mpi_code)
@@ -2269,41 +2232,51 @@ H5D__multi_chunk_filtered_collective_io(H5D_io_info_t *io_info, H5D_dset_io_info
if (0 == max_num_chunks)
HGOTO_DONE(SUCCEED);
- /* Set up contiguous I/O info object */
- H5MM_memcpy(&ctg_io_info, io_info, sizeof(ctg_io_info));
-
if (io_info->op_type == H5D_IO_OP_READ) { /* Filtered collective read */
- for (i = 0; i < max_num_chunks; i++) {
+ for (size_t i = 0; i < max_num_chunks; i++) {
+ H5D_filtered_collective_io_info_t single_chunk_list = {0};
+
/* Check if this rank has a chunk to work on for this iteration */
- have_chunk_to_process = (i < chunk_list_num_entries);
+ have_chunk_to_process = (i < chunk_list.num_chunk_infos);
- if (H5D__mpio_collective_filtered_chunk_read(have_chunk_to_process ? &chunk_list[i] : NULL,
- have_chunk_to_process ? 1 : 0, io_info, dset_info,
- mpi_rank, mpi_size) < 0)
+ /*
+ * Setup a chunk list structure for either 1 or 0 chunks, depending
+ * on whether this rank has a chunk to work on for this iteration
+ */
+ if (have_chunk_to_process) {
+ single_chunk_list.chunk_infos = &chunk_list.chunk_infos[i];
+ single_chunk_list.num_chunk_infos = 1;
+ single_chunk_list.num_chunks_to_read = chunk_list.chunk_infos[i].need_read ? 1 : 0;
+ }
+ else {
+ single_chunk_list.chunk_infos = NULL;
+ single_chunk_list.num_chunk_infos = 0;
+ single_chunk_list.num_chunks_to_read = 0;
+ }
+
+ if (H5D__mpio_collective_filtered_chunk_read(&single_chunk_list, io_info, dset_info, mpi_rank) <
+ 0)
HGOTO_ERROR(H5E_DATASET, H5E_READERROR, FAIL, "couldn't read filtered chunks")
- if (have_chunk_to_process && chunk_list[i].buf) {
- H5MM_free(chunk_list[i].buf);
- chunk_list[i].buf = NULL;
+ if (have_chunk_to_process && chunk_list.chunk_infos[i].buf) {
+ H5MM_free(chunk_list.chunk_infos[i].buf);
+ chunk_list.chunk_infos[i].buf = NULL;
}
}
}
else { /* Filtered collective write */
H5D_chk_idx_info_t index_info;
- hsize_t mpi_buf_count;
/* Construct chunked index info */
H5D_MPIO_INIT_CHUNK_IDX_INFO(index_info, dset_info->dset);
if (mpi_size > 1) {
/* Redistribute shared chunks being written to */
- if (H5D__mpio_redistribute_shared_chunks(chunk_list, chunk_list_num_entries, io_info, mpi_rank,
- mpi_size, NULL) < 0)
+ if (H5D__mpio_redistribute_shared_chunks(&chunk_list, io_info, mpi_rank, mpi_size, NULL) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_WRITEERROR, FAIL, "unable to redistribute shared chunks")
/* Send any chunk modification messages for chunks this rank no longer owns */
- if (H5D__mpio_share_chunk_modification_data(chunk_list, &chunk_list_num_entries, io_info,
- dset_info, mpi_rank, mpi_size, &chunk_hash_table,
+ if (H5D__mpio_share_chunk_modification_data(&chunk_list, io_info, dset_info, mpi_rank, mpi_size,
&chunk_msg_bufs, &chunk_msg_bufs_len) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_WRITEERROR, FAIL,
"unable to send chunk modification data between MPI ranks")
@@ -2313,113 +2286,83 @@ H5D__multi_chunk_filtered_collective_io(H5D_io_info_t *io_info, H5D_dset_io_info
* have no chunks left to work on, but it still needs to participate in the
* collective re-allocation and re-insertion of chunks modified by other ranks.
*/
- for (i = 0; i < max_num_chunks; i++) {
+ for (size_t i = 0; i < max_num_chunks; i++) {
+ H5D_filtered_collective_io_info_t single_chunk_list = {0};
+
/* Check if this rank has a chunk to work on for this iteration */
- have_chunk_to_process = (i < chunk_list_num_entries) && (mpi_rank == chunk_list[i].new_owner);
+ have_chunk_to_process =
+ (i < chunk_list.num_chunk_infos) && (mpi_rank == chunk_list.chunk_infos[i].new_owner);
+
+ /*
+ * Setup a chunk list structure for either 1 or 0 chunks, depending
+ * on whether this rank has a chunk to work on for this iteration
+ */
+ if (have_chunk_to_process) {
+ single_chunk_list.chunk_infos = &chunk_list.chunk_infos[i];
+ single_chunk_list.chunk_hash_table = chunk_list.chunk_hash_table;
+ single_chunk_list.num_chunk_infos = 1;
+ single_chunk_list.num_chunks_to_read = chunk_list.chunk_infos[i].need_read ? 1 : 0;
+ }
+ else {
+ single_chunk_list.chunk_infos = NULL;
+ single_chunk_list.chunk_hash_table = chunk_list.chunk_hash_table;
+ single_chunk_list.num_chunk_infos = 0;
+ single_chunk_list.num_chunks_to_read = 0;
+ }
/* Proceed to update the chunk this rank owns (if any left) with its
* own modification data and data from other ranks, before re-filtering
* the chunks. As chunk reads are done collectively here, all ranks
* must participate.
*/
- if (H5D__mpio_collective_filtered_chunk_update(have_chunk_to_process ? &chunk_list[i] : NULL,
- have_chunk_to_process ? 1 : 0, chunk_hash_table,
- chunk_msg_bufs, chunk_msg_bufs_len, io_info,
- dset_info, mpi_rank, mpi_size) < 0)
+ if (H5D__mpio_collective_filtered_chunk_update(
+ &single_chunk_list, chunk_msg_bufs, chunk_msg_bufs_len, io_info, dset_info, mpi_rank) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_WRITEERROR, FAIL, "couldn't update modified chunks")
/* All ranks now collectively re-allocate file space for all chunks */
- if (H5D__mpio_collective_filtered_chunk_reallocate(have_chunk_to_process ? &chunk_list[i] : NULL,
- have_chunk_to_process ? 1 : 0, NULL, io_info,
- &index_info, mpi_rank, mpi_size) < 0)
+ if (H5D__mpio_collective_filtered_chunk_reallocate(&single_chunk_list, NULL, io_info, &index_info,
+ mpi_rank, mpi_size) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_WRITEERROR, FAIL,
"couldn't collectively re-allocate file space for chunks")
- /*
- * If this rank has a chunk to work on, create a MPI type
- * for writing out the chunk. Otherwise, the rank will
- * use MPI_BYTE for the file and memory type and specify
- * a count of 0.
- */
- if (H5D__mpio_collective_filtered_io_type(
- have_chunk_to_process ? &chunk_list[i] : NULL, have_chunk_to_process ? 1 : 0,
- io_info->op_type, &mem_type, &mem_type_is_derived, &file_type, &file_type_is_derived) < 0)
- HGOTO_ERROR(H5E_DATASET, H5E_CANTGET, FAIL,
- "couldn't create MPI type for writing filtered chunks")
-
- mpi_buf_count = (file_type_is_derived || mem_type_is_derived) ? 1 : 0;
-
- /* Override the write buffer to point to the chunk data buffer */
- if (have_chunk_to_process) {
- /*
- * Override the write buffer to point to the
- * chunk's data buffer
- */
- ctg_io_info.base_maddr.cvp = chunk_list[i].buf;
-
- /*
- * Setup the base storage address for this
- * operation to be the chunk's file address
- */
- ctg_io_info.store_faddr = chunk_list[i].chunk_new.offset;
- }
- else {
- ctg_io_info.store_faddr = 0;
- ctg_io_info.base_maddr = dset_info->buf;
- }
-
- /* Perform the I/O */
- if (H5D__final_collective_io(&ctg_io_info, mpi_buf_count, file_type, mem_type) < 0)
- HGOTO_ERROR(H5E_IO, H5E_CANTGET, FAIL, "couldn't finish MPI-IO")
+ /* Perform vector I/O on chunks */
+ if (H5D__mpio_collective_filtered_vec_io(&single_chunk_list, io_info->f_sh, io_info->op_type) < 0)
+ HGOTO_ERROR(H5E_DATASET, H5E_WRITEERROR, FAIL,
+ "couldn't perform vector I/O on filtered chunks")
/* Free up resources in anticipation of following collective operation */
- if (have_chunk_to_process && chunk_list[i].buf) {
- H5MM_free(chunk_list[i].buf);
- chunk_list[i].buf = NULL;
+ if (have_chunk_to_process && chunk_list.chunk_infos[i].buf) {
+ H5MM_free(chunk_list.chunk_infos[i].buf);
+ chunk_list.chunk_infos[i].buf = NULL;
}
/* Participate in the collective re-insertion of all chunks modified
* in this iteration into the chunk index
*/
- if (H5D__mpio_collective_filtered_chunk_reinsert(have_chunk_to_process ? &chunk_list[i] : NULL,
- have_chunk_to_process ? 1 : 0, NULL, io_info,
- dset_info, &index_info, mpi_rank, mpi_size) < 0)
+ if (H5D__mpio_collective_filtered_chunk_reinsert(&single_chunk_list, NULL, io_info, dset_info,
+ &index_info, mpi_rank, mpi_size) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_WRITEERROR, FAIL,
"couldn't collectively re-insert modified chunks into chunk index")
-
- /* Free the MPI types, if they were derived */
- if (mem_type_is_derived && MPI_SUCCESS != (mpi_code = MPI_Type_free(&mem_type)))
- HMPI_GOTO_ERROR(FAIL, "MPI_Type_free failed", mpi_code)
- mem_type_is_derived = FALSE;
- if (file_type_is_derived && MPI_SUCCESS != (mpi_code = MPI_Type_free(&file_type)))
- HMPI_GOTO_ERROR(FAIL, "MPI_Type_free failed", mpi_code)
- file_type_is_derived = FALSE;
} /* end for */
}
done:
- /* Free the MPI buf and file types, if they were derived */
- if (mem_type_is_derived && MPI_SUCCESS != (mpi_code = MPI_Type_free(&mem_type)))
- HMPI_DONE_ERROR(FAIL, "MPI_Type_free failed", mpi_code)
- if (file_type_is_derived && MPI_SUCCESS != (mpi_code = MPI_Type_free(&file_type)))
- HMPI_DONE_ERROR(FAIL, "MPI_Type_free failed", mpi_code)
-
if (chunk_msg_bufs) {
- for (i = 0; i < (size_t)chunk_msg_bufs_len; i++)
+ for (size_t i = 0; i < (size_t)chunk_msg_bufs_len; i++)
H5MM_free(chunk_msg_bufs[i]);
H5MM_free(chunk_msg_bufs);
}
- HASH_CLEAR(hh, chunk_hash_table);
+ HASH_CLEAR(hh, chunk_list.chunk_hash_table);
/* Free resources used by a rank which had some selection */
- if (chunk_list) {
- for (i = 0; i < chunk_list_num_entries; i++)
- if (chunk_list[i].buf)
- H5MM_free(chunk_list[i].buf);
+ if (chunk_list.chunk_infos) {
+ for (size_t i = 0; i < chunk_list.num_chunk_infos; i++)
+ if (chunk_list.chunk_infos[i].buf)
+ H5MM_free(chunk_list.chunk_infos[i].buf);
- H5MM_free(chunk_list);
+ H5MM_free(chunk_list.chunk_infos);
} /* end if */
#ifdef H5Dmpio_DEBUG
@@ -2643,16 +2586,16 @@ static int
H5D__cmp_filtered_collective_io_info_entry(const void *filtered_collective_io_info_entry1,
const void *filtered_collective_io_info_entry2)
{
- const H5D_filtered_collective_io_info_t *entry1;
- const H5D_filtered_collective_io_info_t *entry2;
- haddr_t addr1 = HADDR_UNDEF;
- haddr_t addr2 = HADDR_UNDEF;
- int ret_value;
+ const H5D_filtered_collective_chunk_info_t *entry1;
+ const H5D_filtered_collective_chunk_info_t *entry2;
+ haddr_t addr1 = HADDR_UNDEF;
+ haddr_t addr2 = HADDR_UNDEF;
+ int ret_value;
FUNC_ENTER_PACKAGE_NOERR
- entry1 = (const H5D_filtered_collective_io_info_t *)filtered_collective_io_info_entry1;
- entry2 = (const H5D_filtered_collective_io_info_t *)filtered_collective_io_info_entry2;
+ entry1 = (const H5D_filtered_collective_chunk_info_t *)filtered_collective_io_info_entry1;
+ entry2 = (const H5D_filtered_collective_chunk_info_t *)filtered_collective_io_info_entry2;
addr1 = entry1->chunk_new.offset;
addr2 = entry2->chunk_new.offset;
@@ -3002,29 +2945,27 @@ done:
*/
static herr_t
H5D__mpio_collective_filtered_chunk_io_setup(const H5D_io_info_t *io_info, const H5D_dset_io_info_t *di,
- H5D_filtered_collective_io_info_t **chunk_list,
- size_t *num_entries, int mpi_rank)
+ int mpi_rank, H5D_filtered_collective_io_info_t *chunk_list)
{
- H5D_filtered_collective_io_info_t *local_info_array = NULL;
- H5D_chunk_ud_t udata;
- hbool_t filter_partial_edge_chunks;
- size_t num_chunks_selected;
- size_t i;
- herr_t ret_value = SUCCEED;
+ H5D_filtered_collective_chunk_info_t *local_info_array = NULL;
+ H5D_chunk_ud_t udata;
+ hbool_t filter_partial_edge_chunks;
+ size_t num_chunks_selected;
+ size_t num_chunks_to_read = 0;
+ herr_t ret_value = SUCCEED;
FUNC_ENTER_PACKAGE
assert(io_info);
assert(di);
+ assert(di->layout->type == H5D_CHUNKED);
assert(chunk_list);
- assert(num_entries);
+
#ifdef H5Dmpio_DEBUG
H5D_MPIO_TRACE_ENTER(mpi_rank);
H5D_MPIO_TIME_START(mpi_rank, "Filtered Collective I/O Setup");
#endif
- assert(di->layout->type == H5D_CHUNKED);
-
/* Each rank builds a local list of the chunks they have selected */
if ((num_chunks_selected = H5SL_count(di->layout_io_info.chunk_map->dset_sel_pieces))) {
H5D_piece_info_t *chunk_info;
@@ -3040,7 +2981,7 @@ H5D__mpio_collective_filtered_chunk_io_setup(const H5D_io_info_t *io_info, const
HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate local io info array buffer")
chunk_node = H5SL_first(di->layout_io_info.chunk_map->dset_sel_pieces);
- for (i = 0; chunk_node; i++) {
+ for (size_t i = 0; chunk_node; i++) {
chunk_info = (H5D_piece_info_t *)H5SL_item(chunk_node);
/* Obtain this chunk's address */
@@ -3109,6 +3050,9 @@ H5D__mpio_collective_filtered_chunk_io_setup(const H5D_io_info_t *io_info, const
local_info_array[i].io_size < (size_t)di->dset->shared->layout.u.chunk.size;
}
+ if (local_info_array[i].need_read)
+ num_chunks_to_read++;
+
local_info_array[i].skip_filter_pline = FALSE;
if (!filter_partial_edge_chunks) {
/*
@@ -3159,12 +3103,8 @@ H5D__mpio_collective_filtered_chunk_io_setup(const H5D_io_info_t *io_info, const
/* Ensure the chunk list is sorted in ascending order of offset in the file */
if (need_sort)
- qsort(local_info_array, num_chunks_selected, sizeof(H5D_filtered_collective_io_info_t),
+ qsort(local_info_array, num_chunks_selected, sizeof(H5D_filtered_collective_chunk_info_t),
H5D__cmp_filtered_collective_io_info_entry);
-
-#ifdef H5Dmpio_DEBUG
- H5D__mpio_dump_collective_filtered_chunk_list(local_info_array, num_chunks_selected, mpi_rank);
-#endif
}
else if (H5F_get_coll_metadata_reads(di->dset->oloc.file)) {
hsize_t scaled[H5O_LAYOUT_NDIMS] = {0};
@@ -3187,10 +3127,19 @@ H5D__mpio_collective_filtered_chunk_io_setup(const H5D_io_info_t *io_info, const
HGOTO_ERROR(H5E_DATASET, H5E_CANTGET, FAIL, "error looking up chunk address")
}
- *chunk_list = local_info_array;
- *num_entries = num_chunks_selected;
+ chunk_list->chunk_infos = local_info_array;
+ chunk_list->num_chunk_infos = num_chunks_selected;
+ chunk_list->num_chunks_to_read = num_chunks_to_read;
+
+#ifdef H5Dmpio_DEBUG
+ H5D__mpio_dump_collective_filtered_chunk_list(chunk_list, mpi_rank);
+#endif
done:
+ if (ret_value < 0) {
+ H5MM_free(local_info_array);
+ }
+
#ifdef H5Dmpio_DEBUG
H5D_MPIO_TIME_STOP(mpi_rank);
H5D_MPIO_TRACE_EXIT(mpi_rank);
@@ -3221,8 +3170,8 @@ done:
*/
static herr_t
H5D__mpio_redistribute_shared_chunks(H5D_filtered_collective_io_info_t *chunk_list,
- size_t chunk_list_num_entries, const H5D_io_info_t *io_info,
- int mpi_rank, int mpi_size, size_t **rank_chunks_assigned_map)
+ const H5D_io_info_t *io_info, int mpi_rank, int mpi_size,
+ size_t **rank_chunks_assigned_map)
{
hbool_t redistribute_on_all_ranks;
size_t *num_chunks_map = NULL;
@@ -3233,7 +3182,7 @@ H5D__mpio_redistribute_shared_chunks(H5D_filtered_collective_io_info_t *chunk_li
FUNC_ENTER_PACKAGE
- assert(chunk_list || 0 == chunk_list_num_entries);
+ assert(chunk_list);
assert(io_info);
assert(mpi_size > 1); /* No chunk sharing is possible for MPI Comm size of 1 */
@@ -3251,7 +3200,7 @@ H5D__mpio_redistribute_shared_chunks(H5D_filtered_collective_io_info_t *chunk_li
HGOTO_ERROR(H5E_RESOURCE, H5E_CANTALLOC, FAIL, "couldn't allocate assigned chunks array")
/* Perform initial Allgather to determine the collective chunk list size */
- if (MPI_SUCCESS != (mpi_code = MPI_Allgather(&chunk_list_num_entries, 1, H5_SIZE_T_AS_MPI_TYPE,
+ if (MPI_SUCCESS != (mpi_code = MPI_Allgather(&chunk_list->num_chunk_infos, 1, H5_SIZE_T_AS_MPI_TYPE,
num_chunks_map, 1, H5_SIZE_T_AS_MPI_TYPE, io_info->comm)))
HMPI_GOTO_ERROR(FAIL, "MPI_Allgather failed", mpi_code)
@@ -3376,8 +3325,8 @@ H5D__mpio_redistribute_shared_chunks_int(H5D_filtered_collective_io_info_t *chun
FUNC_ENTER_PACKAGE
+ assert(chunk_list);
assert(num_chunks_assigned_map);
- assert(chunk_list || 0 == num_chunks_assigned_map[mpi_rank]);
assert(io_info);
assert(mpi_size > 1);
@@ -3437,9 +3386,9 @@ H5D__mpio_redistribute_shared_chunks_int(H5D_filtered_collective_io_info_t *chun
"can't create derived datatypes for chunk redistribution info")
/* Perform gather operation */
- if (H5_mpio_gatherv_alloc(chunk_list, num_chunks_int, struct_type, counts_ptr, displacements_ptr,
- packed_type, all_ranks_involved, 0, io_info->comm, mpi_rank, mpi_size,
- &coll_chunk_list, &coll_chunk_list_num_entries) < 0)
+ if (H5_mpio_gatherv_alloc(chunk_list->chunk_infos, num_chunks_int, struct_type, counts_ptr,
+ displacements_ptr, packed_type, all_ranks_involved, 0, io_info->comm, mpi_rank,
+ mpi_size, &coll_chunk_list, &coll_chunk_list_num_entries) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_CANTGATHER, FAIL,
"can't gather chunk redistribution info to involved ranks")
@@ -3568,8 +3517,19 @@ H5D__mpio_redistribute_shared_chunks_int(H5D_filtered_collective_io_info_t *chun
coll_entry = &((H5D_chunk_redistribute_info_t *)coll_chunk_list)[i++];
- chunk_list[j].new_owner = coll_entry->new_owner;
- chunk_list[j].num_writers = coll_entry->num_writers;
+ chunk_list->chunk_infos[j].new_owner = coll_entry->new_owner;
+ chunk_list->chunk_infos[j].num_writers = coll_entry->num_writers;
+
+ /*
+ * Check if the chunk list struct's `num_chunks_to_read` field
+ * needs to be updated
+ */
+ if (chunk_list->chunk_infos[j].need_read && (chunk_list->chunk_infos[j].new_owner != mpi_rank)) {
+ chunk_list->chunk_infos[j].need_read = FALSE;
+
+ assert(chunk_list->num_chunks_to_read > 0);
+ chunk_list->num_chunks_to_read--;
+ }
}
}
else {
@@ -3579,13 +3539,27 @@ H5D__mpio_redistribute_shared_chunks_int(H5D_filtered_collective_io_info_t *chun
* local chunk lists get updated
*/
if (MPI_SUCCESS !=
- (mpi_code = MPI_Scatterv(coll_chunk_list, counts_ptr, displacements_ptr, packed_type, chunk_list,
- num_chunks_int, struct_type, 0, io_info->comm)))
+ (mpi_code = MPI_Scatterv(coll_chunk_list, counts_ptr, displacements_ptr, packed_type,
+ chunk_list->chunk_infos, num_chunks_int, struct_type, 0, io_info->comm)))
HMPI_GOTO_ERROR(FAIL, "unable to scatter shared chunks info buffer", mpi_code)
+
+ /*
+ * Now that chunks have been redistributed, each rank must update
+ * their chunk list struct's `num_chunks_to_read` field since it
+ * may now be out of date.
+ */
+ for (i = 0; i < chunk_list->num_chunk_infos; i++) {
+ if ((chunk_list->chunk_infos[i].new_owner != mpi_rank) && chunk_list->chunk_infos[i].need_read) {
+ chunk_list->chunk_infos[i].need_read = FALSE;
+
+ assert(chunk_list->num_chunks_to_read > 0);
+ chunk_list->num_chunks_to_read--;
+ }
+ }
}
#ifdef H5Dmpio_DEBUG
- H5D__mpio_dump_collective_filtered_chunk_list(chunk_list, num_chunks_assigned_map[mpi_rank], mpi_rank);
+ H5D__mpio_dump_collective_filtered_chunk_list(chunk_list, mpi_rank);
#endif
done:
@@ -3684,36 +3658,33 @@ done:
*-------------------------------------------------------------------------
*/
static herr_t
-H5D__mpio_share_chunk_modification_data(H5D_filtered_collective_io_info_t *chunk_list,
- size_t *chunk_list_num_entries, H5D_io_info_t *io_info,
+H5D__mpio_share_chunk_modification_data(H5D_filtered_collective_io_info_t *chunk_list, H5D_io_info_t *io_info,
H5D_dset_io_info_t *dset_info, int mpi_rank,
- int H5_ATTR_NDEBUG_UNUSED mpi_size,
- H5D_filtered_collective_io_info_t **chunk_hash_table,
- unsigned char ***chunk_msg_bufs, int *chunk_msg_bufs_len)
+ int H5_ATTR_NDEBUG_UNUSED mpi_size, unsigned char ***chunk_msg_bufs,
+ int *chunk_msg_bufs_len)
{
#if H5_CHECK_MPI_VERSION(3, 0)
- H5D_filtered_collective_io_info_t *chunk_table = NULL;
- H5S_sel_iter_t *mem_iter = NULL;
- unsigned char **msg_send_bufs = NULL;
- unsigned char **msg_recv_bufs = NULL;
- MPI_Request *send_requests = NULL;
- MPI_Request *recv_requests = NULL;
- MPI_Request ibarrier = MPI_REQUEST_NULL;
- hbool_t mem_iter_init = FALSE;
- hbool_t ibarrier_posted = FALSE;
- size_t send_bufs_nalloc = 0;
- size_t num_send_requests = 0;
- size_t num_recv_requests = 0;
- size_t num_msgs_incoming = 0;
- size_t last_assigned_idx;
- size_t i;
- int mpi_code;
- herr_t ret_value = SUCCEED;
+ H5D_filtered_collective_chunk_info_t *chunk_table = NULL;
+ H5S_sel_iter_t *mem_iter = NULL;
+ unsigned char **msg_send_bufs = NULL;
+ unsigned char **msg_recv_bufs = NULL;
+ MPI_Request *send_requests = NULL;
+ MPI_Request *recv_requests = NULL;
+ MPI_Request ibarrier = MPI_REQUEST_NULL;
+ hbool_t mem_iter_init = FALSE;
+ hbool_t ibarrier_posted = FALSE;
+ size_t send_bufs_nalloc = 0;
+ size_t num_send_requests = 0;
+ size_t num_recv_requests = 0;
+ size_t num_msgs_incoming = 0;
+ size_t last_assigned_idx;
+ size_t i;
+ int mpi_code;
+ herr_t ret_value = SUCCEED;
FUNC_ENTER_PACKAGE
- assert(chunk_list_num_entries);
- assert(chunk_list || 0 == *chunk_list_num_entries);
+ assert(chunk_list);
assert(io_info);
assert(dset_info);
assert(mpi_size > 1);
@@ -3728,7 +3699,7 @@ H5D__mpio_share_chunk_modification_data(H5D_filtered_collective_io_info_t *chunk
/* Set to latest format for encoding dataspace */
H5CX_set_libver_bounds(NULL);
- if (*chunk_list_num_entries) {
+ if (chunk_list->num_chunk_infos > 0) {
/* Allocate a selection iterator for iterating over chunk dataspaces */
if (NULL == (mem_iter = H5FL_MALLOC(H5S_sel_iter_t)))
HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate dataspace selection iterator")
@@ -3760,8 +3731,8 @@ H5D__mpio_share_chunk_modification_data(H5D_filtered_collective_io_info_t *chunk
* synchronous sends to send the data this rank is writing to
* the rank that does own the chunk.
*/
- for (i = 0, last_assigned_idx = 0; i < *chunk_list_num_entries; i++) {
- H5D_filtered_collective_io_info_t *chunk_entry = &chunk_list[i];
+ for (i = 0, last_assigned_idx = 0; i < chunk_list->num_chunk_infos; i++) {
+ H5D_filtered_collective_chunk_info_t *chunk_entry = &chunk_list->chunk_infos[i];
if (mpi_rank == chunk_entry->new_owner) {
num_msgs_incoming += (size_t)(chunk_entry->num_writers - 1);
@@ -3771,7 +3742,7 @@ H5D__mpio_share_chunk_modification_data(H5D_filtered_collective_io_info_t *chunk
* does own, since it has sent the necessary data and is no longer
* interested in the chunks it doesn't own.
*/
- chunk_list[last_assigned_idx] = chunk_list[i];
+ chunk_list->chunk_infos[last_assigned_idx] = chunk_list->chunk_infos[i];
/*
* Since, at large scale, a chunk's index value may be larger than
@@ -3783,7 +3754,8 @@ H5D__mpio_share_chunk_modification_data(H5D_filtered_collective_io_info_t *chunk
* message itself will contain the chunk's index so we can update
* the correct chunk with the received data.
*/
- HASH_ADD(hh, chunk_table, index_info.chunk_idx, sizeof(hsize_t), &chunk_list[last_assigned_idx]);
+ HASH_ADD(hh, chunk_table, index_info.chunk_idx, sizeof(hsize_t),
+ &chunk_list->chunk_infos[last_assigned_idx]);
last_assigned_idx++;
}
@@ -4013,10 +3985,12 @@ H5D__mpio_share_chunk_modification_data(H5D_filtered_collective_io_info_t *chunk
H5_GCC_DIAG_ON("stringop-overflow")
/* Set the new number of locally-selected chunks */
- *chunk_list_num_entries = last_assigned_idx;
+ chunk_list->num_chunk_infos = last_assigned_idx;
+
+ /* Set chunk hash table pointer for future use */
+ chunk_list->chunk_hash_table = chunk_table;
/* Return chunk message buffers if any were received */
- *chunk_hash_table = chunk_table;
*chunk_msg_bufs = msg_recv_bufs;
*chunk_msg_bufs_len = (int)num_recv_requests;
@@ -4087,129 +4061,12 @@ done:
} /* end H5D__mpio_share_chunk_modification_data() */
/*-------------------------------------------------------------------------
- * Function: H5D__mpio_collective_filtered_chunk_common_io
- *
- * Purpose: This routine performs the common part of collective I/O
- * when reading or writing filtered chunks collectively.
- *
- * Return: Non-negative on success/Negative on failure
- *
- *-------------------------------------------------------------------------
- */
-static herr_t
-H5D__mpio_collective_filtered_chunk_common_io(H5D_filtered_collective_io_info_t *chunk_list,
- size_t chunk_list_num_entries, const H5D_io_info_t *io_info,
- int mpi_size)
-{
- H5D_io_info_t coll_io_info;
- MPI_Datatype file_type = MPI_DATATYPE_NULL;
- MPI_Datatype mem_type = MPI_DATATYPE_NULL;
- hbool_t mem_type_is_derived = FALSE;
- hbool_t file_type_is_derived = FALSE;
- hsize_t mpi_buf_count;
- haddr_t base_read_offset = HADDR_UNDEF;
- size_t num_chunks;
- size_t i;
- char fake_buf; /* Used as a fake buffer for ranks with no chunks, thus a NULL buf pointer */
- int mpi_code;
- herr_t ret_value = SUCCEED;
-
- FUNC_ENTER_PACKAGE
-
- assert(chunk_list || 0 == chunk_list_num_entries);
- assert(io_info);
-
- /* Initialize temporary I/O info */
- coll_io_info = *io_info;
-
- /*
- * Construct MPI derived datatype for collective I/O on chunks
- */
- if (H5D__mpio_collective_filtered_io_type(chunk_list, chunk_list_num_entries, io_info->op_type, &mem_type,
- &mem_type_is_derived, &file_type, &file_type_is_derived) < 0)
- HGOTO_ERROR(H5E_DATASET, H5E_BADTYPE, FAIL, "couldn't create MPI I/O type for chunk I/O")
-
- /*
- * For reads, determine how many chunks are actually being read.
- * Note that if this is a read during a write operation
- * (read chunk -> unfilter -> modify -> write back), some
- * chunks may not need to be read if they're being fully
- * overwritten during a write operation.
- */
- if (io_info->op_type == H5D_IO_OP_READ) {
- for (i = 0, num_chunks = 0; i < chunk_list_num_entries; i++) {
- assert(chunk_list[i].buf);
-
- if (chunk_list[i].need_read) {
- if (!H5_addr_defined(base_read_offset))
- base_read_offset = chunk_list[i].chunk_current.offset;
-
- num_chunks++;
- }
- }
- }
- else
- num_chunks = chunk_list_num_entries;
-
- /*
- * If this rank doesn't have a selection, it can
- * skip I/O if the MPI communicator size is 1.
- *
- * Otherwise, this rank has to participate in
- * collective I/O, but probably has a NULL buf
- * pointer, so override to a fake buffer since our
- * write/read function expects one.
- */
- if (num_chunks == 0) {
- if (mpi_size == 1)
- HGOTO_DONE(SUCCEED);
- else {
- if (io_info->op_type == H5D_IO_OP_WRITE)
- coll_io_info.base_maddr.cvp = &fake_buf;
- else
- coll_io_info.base_maddr.vp = &fake_buf;
- }
- }
-
- /*
- * Setup for I/O operation
- */
-
- mpi_buf_count = (num_chunks) ? 1 : 0;
-
- if (num_chunks) {
- /*
- * Setup the base storage address for this operation
- * to be the first chunk's file address
- */
- if (io_info->op_type == H5D_IO_OP_WRITE)
- coll_io_info.store_faddr = chunk_list[0].chunk_new.offset;
- else
- coll_io_info.store_faddr = base_read_offset;
- }
- else
- coll_io_info.store_faddr = 0;
-
- /* Perform I/O */
- if (H5D__final_collective_io(&coll_io_info, mpi_buf_count, file_type, mem_type) < 0)
- HGOTO_ERROR(H5E_IO, H5E_READERROR, FAIL, "couldn't finish MPI I/O")
-
-done:
- /* Free the MPI buf and file types, if they were derived */
- if (mem_type_is_derived && MPI_SUCCESS != (mpi_code = MPI_Type_free(&mem_type)))
- HMPI_DONE_ERROR(FAIL, "MPI_Type_free failed", mpi_code)
- if (file_type_is_derived && MPI_SUCCESS != (mpi_code = MPI_Type_free(&file_type)))
- HMPI_DONE_ERROR(FAIL, "MPI_Type_free failed", mpi_code)
-
- FUNC_LEAVE_NOAPI(ret_value)
-} /* end H5D__mpio_collective_filtered_chunk_common_io() */
-
-/*-------------------------------------------------------------------------
* Function: H5D__mpio_collective_filtered_chunk_read
*
* Purpose: This routine coordinates a collective read across all ranks
* of the chunks they have selected. Each rank will then go
- * and
+ * and unfilter their read chunks as necessary and scatter
+ * the data into the provided read buffer.
*
* Return: Non-negative on success/Negative on failure
*
@@ -4217,27 +4074,24 @@ done:
*/
static herr_t
H5D__mpio_collective_filtered_chunk_read(H5D_filtered_collective_io_info_t *chunk_list,
- size_t chunk_list_num_entries, const H5D_io_info_t *io_info,
- const H5D_dset_io_info_t *di, int mpi_rank, int mpi_size)
+ const H5D_io_info_t *io_info, const H5D_dset_io_info_t *di,
+ int mpi_rank)
{
H5D_fill_buf_info_t fb_info;
- H5D_piece_info_t *chunk_info = NULL;
- H5D_io_info_t coll_io_info;
H5Z_EDC_t err_detect; /* Error detection info */
H5Z_cb_t filter_cb; /* I/O filter callback function */
hsize_t file_chunk_size = 0;
hsize_t iter_nelmts; /* Number of points to iterate over for the chunk IO operation */
- hbool_t should_fill = FALSE;
- hbool_t fb_info_init = FALSE;
- hbool_t index_empty = FALSE;
- size_t i;
+ hbool_t should_fill = FALSE;
+ hbool_t fb_info_init = FALSE;
+ hbool_t index_empty = FALSE;
H5S_t *fill_space = NULL;
void *base_read_buf = NULL;
herr_t ret_value = SUCCEED;
FUNC_ENTER_PACKAGE
- assert(chunk_list || 0 == chunk_list_num_entries);
+ assert(chunk_list);
assert(io_info);
assert(di);
@@ -4248,11 +4102,7 @@ H5D__mpio_collective_filtered_chunk_read(H5D_filtered_collective_io_info_t *chun
(void)mpi_rank;
#endif
- /* Initialize temporary I/O info */
- coll_io_info = *io_info;
- coll_io_info.base_maddr.vp = NULL;
-
- if (chunk_list_num_entries) {
+ if (chunk_list->num_chunk_infos) {
/* Retrieve filter settings from API context */
if (H5CX_get_err_detect(&err_detect) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_CANTGET, FAIL, "can't get error detection info")
@@ -4281,12 +4131,14 @@ H5D__mpio_collective_filtered_chunk_read(H5D_filtered_collective_io_info_t *chun
* size; reading into a (smaller) buffer of size equal to the unfiltered
* chunk size would of course be bad.
*/
- for (i = 0; i < chunk_list_num_entries; i++) {
- assert(chunk_list[i].need_read);
+ for (size_t i = 0; i < chunk_list->num_chunk_infos; i++) {
+ H5D_filtered_collective_chunk_info_t *chunk_entry = &chunk_list->chunk_infos[i];
+
+ assert(chunk_entry->need_read);
- chunk_list[i].chunk_buf_size = MAX(chunk_list[i].chunk_current.length, file_chunk_size);
+ chunk_entry->chunk_buf_size = MAX(chunk_entry->chunk_current.length, file_chunk_size);
- if (NULL == (chunk_list[i].buf = H5MM_malloc(chunk_list[i].chunk_buf_size))) {
+ if (NULL == (chunk_entry->buf = H5MM_malloc(chunk_entry->chunk_buf_size))) {
/* Push an error, but participate in collective read */
HDONE_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate chunk data buffer");
break;
@@ -4297,22 +4149,26 @@ H5D__mpio_collective_filtered_chunk_read(H5D_filtered_collective_io_info_t *chun
* read it from the file. Instead, just fill the chunk buffer
* with the fill value if necessary.
*/
- if (H5_addr_defined(chunk_list[i].chunk_current.offset)) {
+ if (H5_addr_defined(chunk_entry->chunk_current.offset)) {
/* Set first read buffer */
if (!base_read_buf)
- base_read_buf = chunk_list[i].buf;
+ base_read_buf = chunk_entry->buf;
/* Set chunk's new length for eventual filter pipeline calls */
- if (chunk_list[i].skip_filter_pline)
- chunk_list[i].chunk_new.length = file_chunk_size;
+ if (chunk_entry->skip_filter_pline)
+ chunk_entry->chunk_new.length = file_chunk_size;
else
- chunk_list[i].chunk_new.length = chunk_list[i].chunk_current.length;
+ chunk_entry->chunk_new.length = chunk_entry->chunk_current.length;
}
else {
- chunk_list[i].need_read = FALSE;
+ chunk_entry->need_read = FALSE;
+
+ /* Update field keeping track of number of chunks to read */
+ assert(chunk_list->num_chunks_to_read > 0);
+ chunk_list->num_chunks_to_read--;
/* Set chunk's new length for eventual filter pipeline calls */
- chunk_list[i].chunk_new.length = file_chunk_size;
+ chunk_entry->chunk_new.length = file_chunk_size;
if (should_fill) {
/* Initialize fill value buffer if not already initialized */
@@ -4341,7 +4197,7 @@ H5D__mpio_collective_filtered_chunk_read(H5D_filtered_collective_io_info_t *chun
/* Write fill value to memory buffer */
assert(fb_info.fill_buf);
- if (H5D__fill(fb_info.fill_buf, di->dset->shared->type, chunk_list[i].buf,
+ if (H5D__fill(fb_info.fill_buf, di->dset->shared->type, chunk_entry->buf,
di->type_info.mem_type, fill_space) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_CANTINIT, FAIL, "couldn't fill chunk buffer with fill value")
}
@@ -4359,49 +4215,42 @@ H5D__mpio_collective_filtered_chunk_read(H5D_filtered_collective_io_info_t *chun
HGOTO_ERROR(H5E_DATASET, H5E_CANTGET, FAIL, "couldn't determine if chunk index is empty")
if (!index_empty) {
- /*
- * Override the read buffer to point to the address of
- * the first chunk data buffer being read into
- */
- if (base_read_buf)
- coll_io_info.base_maddr.vp = base_read_buf;
-
- /* Perform collective chunk read */
- if (H5D__mpio_collective_filtered_chunk_common_io(chunk_list, chunk_list_num_entries, &coll_io_info,
- mpi_size) < 0)
- HGOTO_ERROR(H5E_IO, H5E_READERROR, FAIL, "couldn't finish collective filtered chunk read")
+ /* Perform collective vector read */
+ if (H5D__mpio_collective_filtered_vec_io(chunk_list, io_info->f_sh, H5D_IO_OP_READ) < 0)
+ HGOTO_ERROR(H5E_DATASET, H5E_READERROR, FAIL, "couldn't perform vector I/O on filtered chunks")
}
/*
* Iterate through all the read chunks, unfiltering them and scattering their
* data out to the application's read buffer.
*/
- for (i = 0; i < chunk_list_num_entries; i++) {
- chunk_info = chunk_list[i].chunk_info;
+ for (size_t i = 0; i < chunk_list->num_chunk_infos; i++) {
+ H5D_filtered_collective_chunk_info_t *chunk_entry = &chunk_list->chunk_infos[i];
+ H5D_piece_info_t *chunk_info = chunk_entry->chunk_info;
/* Unfilter the chunk, unless we didn't read it from the file */
- if (chunk_list[i].need_read && !chunk_list[i].skip_filter_pline) {
+ if (chunk_entry->need_read && !chunk_entry->skip_filter_pline) {
if (H5Z_pipeline(&di->dset->shared->dcpl_cache.pline, H5Z_FLAG_REVERSE,
- &(chunk_list[i].index_info.filter_mask), err_detect, filter_cb,
- (size_t *)&chunk_list[i].chunk_new.length, &chunk_list[i].chunk_buf_size,
- &chunk_list[i].buf) < 0)
+ &(chunk_entry->index_info.filter_mask), err_detect, filter_cb,
+ (size_t *)&chunk_entry->chunk_new.length, &chunk_entry->chunk_buf_size,
+ &chunk_entry->buf) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_CANTFILTER, FAIL, "couldn't unfilter chunk for modifying")
}
/* Scatter the chunk data to the read buffer */
iter_nelmts = H5S_GET_SELECT_NPOINTS(chunk_info->fspace);
- if (H5D_select_io_mem(di->buf.vp, chunk_info->mspace, chunk_list[i].buf, chunk_info->fspace,
+ if (H5D_select_io_mem(di->buf.vp, chunk_info->mspace, chunk_entry->buf, chunk_info->fspace,
di->type_info.src_type_size, (size_t)iter_nelmts) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_READERROR, FAIL, "couldn't copy chunk data to read buffer")
}
done:
/* Free all resources used by entries in the chunk list */
- for (i = 0; i < chunk_list_num_entries; i++) {
- if (chunk_list[i].buf) {
- H5MM_free(chunk_list[i].buf);
- chunk_list[i].buf = NULL;
+ for (size_t i = 0; i < chunk_list->num_chunk_infos; i++) {
+ if (chunk_list->chunk_infos[i].buf) {
+ H5MM_free(chunk_list->chunk_infos[i].buf);
+ chunk_list->chunk_infos[i].buf = NULL;
}
}
@@ -4433,19 +4282,15 @@ done:
*/
static herr_t
H5D__mpio_collective_filtered_chunk_update(H5D_filtered_collective_io_info_t *chunk_list,
- size_t chunk_list_num_entries,
- H5D_filtered_collective_io_info_t *chunk_hash_table,
unsigned char **chunk_msg_bufs, int chunk_msg_bufs_len,
const H5D_io_info_t *io_info, const H5D_dset_io_info_t *di,
- int H5_ATTR_NDEBUG_UNUSED mpi_rank, int mpi_size)
+ int H5_ATTR_NDEBUG_UNUSED mpi_rank)
{
const H5D_type_info_t *type_info = NULL;
H5D_fill_buf_info_t fb_info;
- H5D_piece_info_t *chunk_info = NULL;
- H5S_sel_iter_t *sel_iter = NULL; /* Dataspace selection iterator for H5D__scatter_mem */
- H5D_io_info_t coll_io_info;
- H5Z_EDC_t err_detect; /* Error detection info */
- H5Z_cb_t filter_cb; /* I/O filter callback function */
+ H5S_sel_iter_t *sel_iter = NULL; /* Dataspace selection iterator for H5D__scatter_mem */
+ H5Z_EDC_t err_detect; /* Error detection info */
+ H5Z_cb_t filter_cb; /* I/O filter callback function */
hsize_t file_chunk_size = 0;
hsize_t iter_nelmts; /* Number of points to iterate over for the chunk IO operation */
hbool_t should_fill = FALSE;
@@ -4460,8 +4305,8 @@ H5D__mpio_collective_filtered_chunk_update(H5D_filtered_collective_io_info_t *ch
FUNC_ENTER_PACKAGE
- assert(chunk_list || 0 == chunk_list_num_entries);
- assert((chunk_msg_bufs && chunk_hash_table) || 0 == chunk_msg_bufs_len);
+ assert(chunk_list);
+ assert((chunk_msg_bufs && chunk_list->chunk_hash_table) || 0 == chunk_msg_bufs_len);
assert(io_info);
assert(di);
@@ -4474,7 +4319,7 @@ H5D__mpio_collective_filtered_chunk_update(H5D_filtered_collective_io_info_t *ch
type_info = &(di->type_info);
assert(type_info);
- if (chunk_list_num_entries) {
+ if (chunk_list->num_chunk_infos > 0) {
/* Retrieve filter settings from API context */
if (H5CX_get_err_detect(&err_detect) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_CANTGET, FAIL, "can't get error detection info")
@@ -4509,50 +4354,56 @@ H5D__mpio_collective_filtered_chunk_update(H5D_filtered_collective_io_info_t *ch
* size; reading into a (smaller) buffer of size equal to the unfiltered
* chunk size would of course be bad.
*/
- for (i = 0; i < chunk_list_num_entries; i++) {
- assert(mpi_rank == chunk_list[i].new_owner);
+ for (i = 0; i < chunk_list->num_chunk_infos; i++) {
+ H5D_filtered_collective_chunk_info_t *chunk_entry = &chunk_list->chunk_infos[i];
- chunk_list[i].chunk_buf_size = MAX(chunk_list[i].chunk_current.length, file_chunk_size);
+ assert(mpi_rank == chunk_entry->new_owner);
+
+ chunk_entry->chunk_buf_size = MAX(chunk_entry->chunk_current.length, file_chunk_size);
/*
* If this chunk hasn't been allocated yet and we aren't writing
* out fill values to it, make sure to 0-fill its memory buffer
* so we don't use uninitialized memory.
*/
- if (!H5_addr_defined(chunk_list[i].chunk_current.offset) && !should_fill)
- chunk_list[i].buf = H5MM_calloc(chunk_list[i].chunk_buf_size);
+ if (!H5_addr_defined(chunk_entry->chunk_current.offset) && !should_fill)
+ chunk_entry->buf = H5MM_calloc(chunk_entry->chunk_buf_size);
else
- chunk_list[i].buf = H5MM_malloc(chunk_list[i].chunk_buf_size);
+ chunk_entry->buf = H5MM_malloc(chunk_entry->chunk_buf_size);
- if (NULL == chunk_list[i].buf) {
+ if (NULL == chunk_entry->buf) {
/* Push an error, but participate in collective read */
HDONE_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate chunk data buffer");
break;
}
/* Set chunk's new length for eventual filter pipeline calls */
- if (chunk_list[i].need_read) {
+ if (chunk_entry->need_read) {
/*
* Check if chunk is currently allocated. If not, don't try to
* read it from the file. Instead, just fill the chunk buffer
* with the fill value if fill values are to be written.
*/
- if (H5_addr_defined(chunk_list[i].chunk_current.offset)) {
+ if (H5_addr_defined(chunk_entry->chunk_current.offset)) {
/* Set first read buffer */
if (!base_read_buf)
- base_read_buf = chunk_list[i].buf;
+ base_read_buf = chunk_entry->buf;
/* Set chunk's new length for eventual filter pipeline calls */
- if (chunk_list[i].skip_filter_pline)
- chunk_list[i].chunk_new.length = file_chunk_size;
+ if (chunk_entry->skip_filter_pline)
+ chunk_entry->chunk_new.length = file_chunk_size;
else
- chunk_list[i].chunk_new.length = chunk_list[i].chunk_current.length;
+ chunk_entry->chunk_new.length = chunk_entry->chunk_current.length;
}
else {
- chunk_list[i].need_read = FALSE;
+ chunk_entry->need_read = FALSE;
+
+ /* Update field keeping track of number of chunks to read */
+ assert(chunk_list->num_chunks_to_read > 0);
+ chunk_list->num_chunks_to_read--;
/* Set chunk's new length for eventual filter pipeline calls */
- chunk_list[i].chunk_new.length = file_chunk_size;
+ chunk_entry->chunk_new.length = file_chunk_size;
if (should_fill) {
/* Initialize fill value buffer if not already initialized */
@@ -4583,7 +4434,7 @@ H5D__mpio_collective_filtered_chunk_update(H5D_filtered_collective_io_info_t *ch
/* Write fill value to memory buffer */
assert(fb_info.fill_buf);
- if (H5D__fill(fb_info.fill_buf, di->dset->shared->type, chunk_list[i].buf,
+ if (H5D__fill(fb_info.fill_buf, di->dset->shared->type, chunk_entry->buf,
type_info->mem_type, fill_space) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_CANTINIT, FAIL,
"couldn't fill chunk buffer with fill value")
@@ -4591,7 +4442,7 @@ H5D__mpio_collective_filtered_chunk_update(H5D_filtered_collective_io_info_t *ch
}
}
else
- chunk_list[i].chunk_new.length = file_chunk_size;
+ chunk_entry->chunk_new.length = file_chunk_size;
}
/*
@@ -4605,25 +4456,9 @@ H5D__mpio_collective_filtered_chunk_update(H5D_filtered_collective_io_info_t *ch
HGOTO_ERROR(H5E_DATASET, H5E_CANTGET, FAIL, "couldn't determine if chunk index is empty")
if (!index_empty) {
- /*
- * Setup for I/O operation
- */
-
- /* Initialize temporary I/O info */
- coll_io_info = *io_info;
- coll_io_info.op_type = H5D_IO_OP_READ;
-
- /* Override the read buffer to point to the address of the first
- * chunk data buffer being read into
- */
- if (base_read_buf) {
- coll_io_info.base_maddr.vp = base_read_buf;
- }
-
- /* Read all chunks that need to be read from the file */
- if (H5D__mpio_collective_filtered_chunk_common_io(chunk_list, chunk_list_num_entries, &coll_io_info,
- mpi_size) < 0)
- HGOTO_ERROR(H5E_IO, H5E_READERROR, FAIL, "couldn't finish collective filtered chunk read")
+ /* Perform collective vector read */
+ if (H5D__mpio_collective_filtered_vec_io(chunk_list, io_info->f_sh, H5D_IO_OP_READ) < 0)
+ HGOTO_ERROR(H5E_DATASET, H5E_READERROR, FAIL, "couldn't perform vector I/O on filtered chunks")
}
/*
@@ -4632,26 +4467,27 @@ H5D__mpio_collective_filtered_chunk_update(H5D_filtered_collective_io_info_t *ch
*/
/* Process all chunks with data from the owning rank first */
- for (i = 0; i < chunk_list_num_entries; i++) {
- assert(mpi_rank == chunk_list[i].new_owner);
+ for (i = 0; i < chunk_list->num_chunk_infos; i++) {
+ H5D_filtered_collective_chunk_info_t *chunk_entry = &chunk_list->chunk_infos[i];
+ H5D_piece_info_t *chunk_info = chunk_entry->chunk_info;
- chunk_info = chunk_list[i].chunk_info;
+ assert(mpi_rank == chunk_entry->new_owner);
/*
* If this chunk wasn't being fully overwritten, we read it from
* the file, so we need to unfilter it
*/
- if (chunk_list[i].need_read && !chunk_list[i].skip_filter_pline) {
+ if (chunk_entry->need_read && !chunk_entry->skip_filter_pline) {
if (H5Z_pipeline(&di->dset->shared->dcpl_cache.pline, H5Z_FLAG_REVERSE,
- &(chunk_list[i].index_info.filter_mask), err_detect, filter_cb,
- (size_t *)&chunk_list[i].chunk_new.length, &chunk_list[i].chunk_buf_size,
- &chunk_list[i].buf) < 0)
+ &(chunk_entry->index_info.filter_mask), err_detect, filter_cb,
+ (size_t *)&chunk_entry->chunk_new.length, &chunk_entry->chunk_buf_size,
+ &chunk_entry->buf) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_CANTFILTER, FAIL, "couldn't unfilter chunk for modifying")
}
iter_nelmts = H5S_GET_SELECT_NPOINTS(chunk_info->mspace);
- if (H5D_select_io_mem(chunk_list[i].buf, chunk_info->fspace, di->buf.cvp, chunk_info->mspace,
+ if (H5D_select_io_mem(chunk_entry->buf, chunk_info->fspace, di->buf.cvp, chunk_info->mspace,
type_info->dst_type_size, (size_t)iter_nelmts) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_WRITEERROR, FAIL, "couldn't copy chunk data to write buffer")
}
@@ -4662,9 +4498,9 @@ H5D__mpio_collective_filtered_chunk_update(H5D_filtered_collective_io_info_t *ch
/* Now process all received chunk message buffers */
for (i = 0; i < (size_t)chunk_msg_bufs_len; i++) {
- H5D_filtered_collective_io_info_t *chunk_entry = NULL;
- const unsigned char *msg_ptr = chunk_msg_bufs[i];
- hsize_t chunk_idx;
+ H5D_filtered_collective_chunk_info_t *chunk_entry = NULL;
+ const unsigned char *msg_ptr = chunk_msg_bufs[i];
+ hsize_t chunk_idx;
if (msg_ptr) {
/* Retrieve the chunk's index value */
@@ -4672,7 +4508,7 @@ H5D__mpio_collective_filtered_chunk_update(H5D_filtered_collective_io_info_t *ch
msg_ptr += sizeof(hsize_t);
/* Find the chunk entry according to its chunk index */
- HASH_FIND(hh, chunk_hash_table, &chunk_idx, sizeof(hsize_t), chunk_entry);
+ HASH_FIND(hh, chunk_list->chunk_hash_table, &chunk_idx, sizeof(hsize_t), chunk_entry);
if (chunk_entry == NULL)
HGOTO_ERROR(H5E_DATASET, H5E_CANTFIND, FAIL, "unable to find chunk entry")
if (mpi_rank != chunk_entry->new_owner)
@@ -4720,17 +4556,18 @@ H5D__mpio_collective_filtered_chunk_update(H5D_filtered_collective_io_info_t *ch
}
/* Finally, filter all the chunks */
- for (i = 0; i < chunk_list_num_entries; i++) {
- if (!chunk_list[i].skip_filter_pline) {
- if (H5Z_pipeline(&di->dset->shared->dcpl_cache.pline, 0, &(chunk_list[i].index_info.filter_mask),
- err_detect, filter_cb, (size_t *)&chunk_list[i].chunk_new.length,
- &chunk_list[i].chunk_buf_size, &chunk_list[i].buf) < 0)
+ for (i = 0; i < chunk_list->num_chunk_infos; i++) {
+ if (!chunk_list->chunk_infos[i].skip_filter_pline) {
+ if (H5Z_pipeline(&di->dset->shared->dcpl_cache.pline, 0,
+ &(chunk_list->chunk_infos[i].index_info.filter_mask), err_detect, filter_cb,
+ (size_t *)&chunk_list->chunk_infos[i].chunk_new.length,
+ &chunk_list->chunk_infos[i].chunk_buf_size, &chunk_list->chunk_infos[i].buf) < 0)
HGOTO_ERROR(H5E_PLINE, H5E_CANTFILTER, FAIL, "output pipeline failed")
}
#if H5_SIZEOF_SIZE_T > 4
/* Check for the chunk expanding too much to encode in a 32-bit value */
- if (chunk_list[i].chunk_new.length > ((size_t)0xffffffff))
+ if (chunk_list->chunk_infos[i].chunk_new.length > ((size_t)0xffffffff))
HGOTO_ERROR(H5E_DATASET, H5E_BADRANGE, FAIL, "chunk too large for 32-bit length")
#endif
}
@@ -4752,10 +4589,10 @@ done:
/* On failure, try to free all resources used by entries in the chunk list */
if (ret_value < 0) {
- for (i = 0; i < chunk_list_num_entries; i++) {
- if (chunk_list[i].buf) {
- H5MM_free(chunk_list[i].buf);
- chunk_list[i].buf = NULL;
+ for (i = 0; i < chunk_list->num_chunk_infos; i++) {
+ if (chunk_list->chunk_infos[i].buf) {
+ H5MM_free(chunk_list->chunk_infos[i].buf);
+ chunk_list->chunk_infos[i].buf = NULL;
}
}
}
@@ -4783,9 +4620,8 @@ done:
*/
static herr_t
H5D__mpio_collective_filtered_chunk_reallocate(H5D_filtered_collective_io_info_t *chunk_list,
- size_t chunk_list_num_entries, size_t *num_chunks_assigned_map,
- H5D_io_info_t *io_info, H5D_chk_idx_info_t *idx_info,
- int mpi_rank, int mpi_size)
+ size_t *num_chunks_assigned_map, H5D_io_info_t *io_info,
+ H5D_chk_idx_info_t *idx_info, int mpi_rank, int mpi_size)
{
H5D_chunk_alloc_info_t *collective_list = NULL;
MPI_Datatype send_type;
@@ -4805,7 +4641,7 @@ H5D__mpio_collective_filtered_chunk_reallocate(H5D_filtered_collective_io_info_t
FUNC_ENTER_PACKAGE
- assert(chunk_list || 0 == chunk_list_num_entries);
+ assert(chunk_list);
assert(io_info);
assert(idx_info);
assert(idx_info->storage->idx_type != H5D_CHUNK_IDX_NONE);
@@ -4819,7 +4655,7 @@ H5D__mpio_collective_filtered_chunk_reallocate(H5D_filtered_collective_io_info_t
* Make sure it's safe to cast this rank's number
* of chunks to be sent into an int for MPI
*/
- H5_CHECK_OVERFLOW(chunk_list_num_entries, size_t, int);
+ H5_CHECK_OVERFLOW(chunk_list->num_chunk_infos, size_t, int);
/* Create derived datatypes for the chunk file space info needed */
if (H5D__mpio_get_chunk_alloc_info_types(&recv_type, &recv_type_derived, &send_type, &send_type_derived) <
@@ -4859,9 +4695,9 @@ H5D__mpio_collective_filtered_chunk_reallocate(H5D_filtered_collective_io_info_t
}
/* Perform gather operation */
- if (H5_mpio_gatherv_alloc(chunk_list, (int)chunk_list_num_entries, send_type, counts_ptr,
- displacements_ptr, recv_type, TRUE, 0, io_info->comm, mpi_rank, mpi_size,
- &gathered_array, &collective_num_entries) < 0)
+ if (H5_mpio_gatherv_alloc(chunk_list->chunk_infos, (int)chunk_list->num_chunk_infos, send_type,
+ counts_ptr, displacements_ptr, recv_type, TRUE, 0, io_info->comm, mpi_rank,
+ mpi_size, &gathered_array, &collective_num_entries) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_CANTGATHER, FAIL, "can't gather chunk file space info to/from ranks")
}
else {
@@ -4872,9 +4708,9 @@ H5D__mpio_collective_filtered_chunk_reallocate(H5D_filtered_collective_io_info_t
* contributing before performing the actual gather operation. Use
* the 'simple' MPI_Allgatherv wrapper for this.
*/
- if (H5_mpio_gatherv_alloc_simple(chunk_list, (int)chunk_list_num_entries, send_type, recv_type, TRUE,
- 0, io_info->comm, mpi_rank, mpi_size, &gathered_array,
- &collective_num_entries) < 0)
+ if (H5_mpio_gatherv_alloc_simple(chunk_list->chunk_infos, (int)chunk_list->num_chunk_infos, send_type,
+ recv_type, TRUE, 0, io_info->comm, mpi_rank, mpi_size,
+ &gathered_array, &collective_num_entries) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_CANTGATHER, FAIL, "can't gather chunk file space info to/from ranks")
}
@@ -4894,14 +4730,14 @@ H5D__mpio_collective_filtered_chunk_reallocate(H5D_filtered_collective_io_info_t
* rank, make sure to update the chunk entry in the local
* chunk list
*/
- update_local_chunk =
- (num_local_chunks_processed < chunk_list_num_entries) &&
- (coll_entry->chunk_idx == chunk_list[num_local_chunks_processed].index_info.chunk_idx);
+ update_local_chunk = (num_local_chunks_processed < chunk_list->num_chunk_infos) &&
+ (coll_entry->chunk_idx ==
+ chunk_list->chunk_infos[num_local_chunks_processed].index_info.chunk_idx);
if (update_local_chunk) {
- H5D_filtered_collective_io_info_t *local_chunk;
+ H5D_filtered_collective_chunk_info_t *local_chunk;
- local_chunk = &chunk_list[num_local_chunks_processed];
+ local_chunk = &chunk_list->chunk_infos[num_local_chunks_processed];
/* Sanity check that this chunk is actually local */
assert(mpi_rank == local_chunk->orig_owner);
@@ -4917,7 +4753,8 @@ H5D__mpio_collective_filtered_chunk_reallocate(H5D_filtered_collective_io_info_t
*/
if (num_local_chunks_processed) {
haddr_t curr_chunk_offset = local_chunk->chunk_new.offset;
- haddr_t prev_chunk_offset = chunk_list[num_local_chunks_processed - 1].chunk_new.offset;
+ haddr_t prev_chunk_offset =
+ chunk_list->chunk_infos[num_local_chunks_processed - 1].chunk_new.offset;
assert(H5_addr_defined(prev_chunk_offset) && H5_addr_defined(curr_chunk_offset));
if (curr_chunk_offset < prev_chunk_offset)
@@ -4928,15 +4765,15 @@ H5D__mpio_collective_filtered_chunk_reallocate(H5D_filtered_collective_io_info_t
}
}
- assert(chunk_list_num_entries == num_local_chunks_processed);
+ assert(chunk_list->num_chunk_infos == num_local_chunks_processed);
/*
* Ensure this rank's local chunk list is sorted in
* ascending order of offset in the file
*/
if (need_sort)
- qsort(chunk_list, chunk_list_num_entries, sizeof(H5D_filtered_collective_io_info_t),
- H5D__cmp_filtered_collective_io_info_entry);
+ qsort(chunk_list->chunk_infos, chunk_list->num_chunk_infos,
+ sizeof(H5D_filtered_collective_chunk_info_t), H5D__cmp_filtered_collective_io_info_entry);
done:
H5MM_free(gathered_array);
@@ -4974,9 +4811,9 @@ done:
*/
static herr_t
H5D__mpio_collective_filtered_chunk_reinsert(H5D_filtered_collective_io_info_t *chunk_list,
- size_t chunk_list_num_entries, size_t *num_chunks_assigned_map,
- H5D_io_info_t *io_info, H5D_dset_io_info_t *di,
- H5D_chk_idx_info_t *idx_info, int mpi_rank, int mpi_size)
+ size_t *num_chunks_assigned_map, H5D_io_info_t *io_info,
+ H5D_dset_io_info_t *di, H5D_chk_idx_info_t *idx_info,
+ int mpi_rank, int mpi_size)
{
H5D_chunk_ud_t chunk_ud;
MPI_Datatype send_type;
@@ -4995,7 +4832,7 @@ H5D__mpio_collective_filtered_chunk_reinsert(H5D_filtered_collective_io_info_t *
FUNC_ENTER_PACKAGE
- assert(chunk_list || 0 == chunk_list_num_entries);
+ assert(chunk_list);
assert(io_info);
assert(di);
assert(idx_info);
@@ -5013,7 +4850,7 @@ H5D__mpio_collective_filtered_chunk_reinsert(H5D_filtered_collective_io_info_t *
* Make sure it's safe to cast this rank's number
* of chunks to be sent into an int for MPI
*/
- H5_CHECK_OVERFLOW(chunk_list_num_entries, size_t, int);
+ H5_CHECK_OVERFLOW(chunk_list->num_chunk_infos, size_t, int);
/* Create derived datatypes for the chunk re-insertion info needed */
if (H5D__mpio_get_chunk_insert_info_types(&recv_type, &recv_type_derived, &send_type,
@@ -5053,9 +4890,9 @@ H5D__mpio_collective_filtered_chunk_reinsert(H5D_filtered_collective_io_info_t *
}
/* Perform gather operation */
- if (H5_mpio_gatherv_alloc(chunk_list, (int)chunk_list_num_entries, send_type, counts_ptr,
- displacements_ptr, recv_type, TRUE, 0, io_info->comm, mpi_rank, mpi_size,
- &gathered_array, &collective_num_entries) < 0)
+ if (H5_mpio_gatherv_alloc(chunk_list->chunk_infos, (int)chunk_list->num_chunk_infos, send_type,
+ counts_ptr, displacements_ptr, recv_type, TRUE, 0, io_info->comm, mpi_rank,
+ mpi_size, &gathered_array, &collective_num_entries) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_CANTGATHER, FAIL,
"can't gather chunk index re-insertion info to/from ranks")
}
@@ -5067,9 +4904,9 @@ H5D__mpio_collective_filtered_chunk_reinsert(H5D_filtered_collective_io_info_t *
* contributing before performing the actual gather operation. Use
* the 'simple' MPI_Allgatherv wrapper for this.
*/
- if (H5_mpio_gatherv_alloc_simple(chunk_list, (int)chunk_list_num_entries, send_type, recv_type, TRUE,
- 0, io_info->comm, mpi_rank, mpi_size, &gathered_array,
- &collective_num_entries) < 0)
+ if (H5_mpio_gatherv_alloc_simple(chunk_list->chunk_infos, (int)chunk_list->num_chunk_infos, send_type,
+ recv_type, TRUE, 0, io_info->comm, mpi_rank, mpi_size,
+ &gathered_array, &collective_num_entries) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_CANTGATHER, FAIL,
"can't gather chunk index re-insertion info to/from ranks")
}
@@ -5128,10 +4965,11 @@ H5D__mpio_collective_filtered_chunk_reinsert(H5D_filtered_collective_io_info_t *
* the calculated coordinates above to make sure
* they match.
*/
- for (size_t dbg_idx = 0; dbg_idx < chunk_list_num_entries; dbg_idx++) {
- if (coll_entry->index_info.chunk_idx == chunk_list[dbg_idx].index_info.chunk_idx) {
- hbool_t coords_match = !memcmp(scaled_coords, chunk_list[dbg_idx].chunk_info->scaled,
- di->dset->shared->ndims * sizeof(hsize_t));
+ for (size_t dbg_idx = 0; dbg_idx < chunk_list->num_chunk_infos; dbg_idx++) {
+ if (coll_entry->index_info.chunk_idx == chunk_list->chunk_infos[dbg_idx].index_info.chunk_idx) {
+ hbool_t coords_match =
+ !memcmp(scaled_coords, chunk_list->chunk_infos[dbg_idx].chunk_info->scaled,
+ di->dset->shared->ndims * sizeof(hsize_t));
assert(coords_match && "Calculated scaled coordinates for chunk didn't match "
"chunk's actual scaled coordinates!");
@@ -5169,7 +5007,7 @@ done:
* Function: H5D__mpio_get_chunk_redistribute_info_types
*
* Purpose: Constructs MPI derived datatypes for communicating the
- * info from a H5D_filtered_collective_io_info_t structure
+ * info from a H5D_filtered_collective_chunk_info_t structure
* that is necessary for redistributing shared chunks during a
* collective write of filtered chunks.
*
@@ -5179,7 +5017,7 @@ done:
* type.
*
* The datatype returned through `resized_type` has an extent
- * equal to the size of an H5D_filtered_collective_io_info_t
+ * equal to the size of an H5D_filtered_collective_chunk_info_t
* structure. This makes it suitable for sending an array of
* those structures, while extracting out just the info
* necessary for the chunk redistribution operation during
@@ -5250,7 +5088,7 @@ H5D__mpio_get_chunk_redistribute_info_types(MPI_Datatype *contig_type, hbool_t *
HMPI_GOTO_ERROR(FAIL, "MPI_Type_commit failed", mpi_code)
/* Create struct type to extract the chunk_current, chunk_idx, orig_owner,
- * new_owner and num_writers fields from a H5D_filtered_collective_io_info_t
+ * new_owner and num_writers fields from a H5D_filtered_collective_chunk_info_t
* structure
*/
block_lengths[0] = 1;
@@ -5258,11 +5096,11 @@ H5D__mpio_get_chunk_redistribute_info_types(MPI_Datatype *contig_type, hbool_t *
block_lengths[2] = 1;
block_lengths[3] = 1;
block_lengths[4] = 1;
- displacements[0] = offsetof(H5D_filtered_collective_io_info_t, chunk_current);
- displacements[1] = offsetof(H5D_filtered_collective_io_info_t, index_info.chunk_idx);
- displacements[2] = offsetof(H5D_filtered_collective_io_info_t, orig_owner);
- displacements[3] = offsetof(H5D_filtered_collective_io_info_t, new_owner);
- displacements[4] = offsetof(H5D_filtered_collective_io_info_t, num_writers);
+ displacements[0] = offsetof(H5D_filtered_collective_chunk_info_t, chunk_current);
+ displacements[1] = offsetof(H5D_filtered_collective_chunk_info_t, index_info.chunk_idx);
+ displacements[2] = offsetof(H5D_filtered_collective_chunk_info_t, orig_owner);
+ displacements[3] = offsetof(H5D_filtered_collective_chunk_info_t, new_owner);
+ displacements[4] = offsetof(H5D_filtered_collective_chunk_info_t, num_writers);
types[0] = chunk_block_type;
types[1] = HSIZE_AS_MPI_TYPE;
types[2] = MPI_INT;
@@ -5274,7 +5112,7 @@ H5D__mpio_get_chunk_redistribute_info_types(MPI_Datatype *contig_type, hbool_t *
struct_type_derived = TRUE;
if (MPI_SUCCESS != (mpi_code = MPI_Type_create_resized(
- struct_type, 0, sizeof(H5D_filtered_collective_io_info_t), resized_type)))
+ struct_type, 0, sizeof(H5D_filtered_collective_chunk_info_t), resized_type)))
HMPI_GOTO_ERROR(FAIL, "MPI_Type_create_resized failed", mpi_code)
*resized_type_derived = TRUE;
@@ -5311,16 +5149,16 @@ done:
* Function: H5D__mpio_get_chunk_alloc_info_types
*
* Purpose: Constructs MPI derived datatypes for communicating the info
- * from a H5D_filtered_collective_io_info_t structure that is
- * necessary for re-allocating file space during a collective
- * write of filtered chunks.
+ * from a H5D_filtered_collective_chunk_info_t structure that
+ * is necessary for re-allocating file space during a
+ * collective write of filtered chunks.
*
* The datatype returned through `contig_type` has an extent
* equal to the size of an H5D_chunk_alloc_info_t structure
* and is suitable for communicating that structure type.
*
* The datatype returned through `resized_type` has an extent
- * equal to the size of an H5D_filtered_collective_io_info_t
+ * equal to the size of an H5D_filtered_collective_chunk_info_t
* structure. This makes it suitable for sending an array of
* those structures, while extracting out just the info
* necessary for the chunk file space reallocation operation
@@ -5385,14 +5223,14 @@ H5D__mpio_get_chunk_alloc_info_types(MPI_Datatype *contig_type, hbool_t *contig_
/*
* Create struct type to extract the chunk_current, chunk_new and chunk_idx
- * fields from a H5D_filtered_collective_io_info_t structure
+ * fields from a H5D_filtered_collective_chunk_info_t structure
*/
block_lengths[0] = 1;
block_lengths[1] = 1;
block_lengths[2] = 1;
- displacements[0] = offsetof(H5D_filtered_collective_io_info_t, chunk_current);
- displacements[1] = offsetof(H5D_filtered_collective_io_info_t, chunk_new);
- displacements[2] = offsetof(H5D_filtered_collective_io_info_t, index_info.chunk_idx);
+ displacements[0] = offsetof(H5D_filtered_collective_chunk_info_t, chunk_current);
+ displacements[1] = offsetof(H5D_filtered_collective_chunk_info_t, chunk_new);
+ displacements[2] = offsetof(H5D_filtered_collective_chunk_info_t, index_info.chunk_idx);
types[0] = chunk_block_type;
types[1] = chunk_block_type;
types[2] = HSIZE_AS_MPI_TYPE;
@@ -5402,7 +5240,7 @@ H5D__mpio_get_chunk_alloc_info_types(MPI_Datatype *contig_type, hbool_t *contig_
struct_type_derived = TRUE;
if (MPI_SUCCESS != (mpi_code = MPI_Type_create_resized(
- struct_type, 0, sizeof(H5D_filtered_collective_io_info_t), resized_type)))
+ struct_type, 0, sizeof(H5D_filtered_collective_chunk_info_t), resized_type)))
HMPI_GOTO_ERROR(FAIL, "MPI_Type_create_resized failed", mpi_code)
*resized_type_derived = TRUE;
@@ -5442,7 +5280,7 @@ done:
* information necessary when reinserting chunks into a
* dataset's chunk index. This includes the chunk's new offset
* and size (H5F_block_t) and the inner `index_info` structure
- * of a H5D_filtered_collective_io_info_t structure.
+ * of a H5D_filtered_collective_chunk_info_t structure.
*
* The datatype returned through `contig_type` has an extent
* equal to the size of an H5D_chunk_insert_info_t structure
@@ -5450,9 +5288,9 @@ done:
*
* The datatype returned through `resized_type` has an extent
* equal to the size of the encompassing
- * H5D_filtered_collective_io_info_t structure. This makes it
- * suitable for sending an array of
- * H5D_filtered_collective_io_info_t structures, while
+ * H5D_filtered_collective_chunk_info_t structure. This makes
+ * it suitable for sending an array of
+ * H5D_filtered_collective_chunk_info_t structures, while
* extracting out just the information needed during
* communication.
*
@@ -5531,20 +5369,20 @@ H5D__mpio_get_chunk_insert_info_types(MPI_Datatype *contig_type, hbool_t *contig
/*
* Create struct type to correctly extract all needed
- * information from a H5D_filtered_collective_io_info_t
+ * information from a H5D_filtered_collective_chunk_info_t
* structure.
*/
- displacements[0] = offsetof(H5D_filtered_collective_io_info_t, chunk_new);
- displacements[1] = offsetof(H5D_filtered_collective_io_info_t, index_info.chunk_idx);
- displacements[2] = offsetof(H5D_filtered_collective_io_info_t, index_info.filter_mask);
- displacements[3] = offsetof(H5D_filtered_collective_io_info_t, index_info.need_insert);
+ displacements[0] = offsetof(H5D_filtered_collective_chunk_info_t, chunk_new);
+ displacements[1] = offsetof(H5D_filtered_collective_chunk_info_t, index_info.chunk_idx);
+ displacements[2] = offsetof(H5D_filtered_collective_chunk_info_t, index_info.filter_mask);
+ displacements[3] = offsetof(H5D_filtered_collective_chunk_info_t, index_info.need_insert);
if (MPI_SUCCESS !=
(mpi_code = MPI_Type_create_struct(field_count, block_lengths, displacements, types, &struct_type)))
HMPI_GOTO_ERROR(FAIL, "MPI_Type_create_struct failed", mpi_code)
struct_type_derived = TRUE;
if (MPI_SUCCESS != (mpi_code = MPI_Type_create_resized(
- struct_type, 0, sizeof(H5D_filtered_collective_io_info_t), resized_type)))
+ struct_type, 0, sizeof(H5D_filtered_collective_chunk_info_t), resized_type)))
HMPI_GOTO_ERROR(FAIL, "MPI_Type_create_resized failed", mpi_code)
*resized_type_derived = TRUE;
@@ -5578,274 +5416,148 @@ done:
} /* end H5D__mpio_get_chunk_insert_info_types() */
/*-------------------------------------------------------------------------
- * Function: H5D__mpio_collective_filtered_io_type
+ * Function: H5D__mpio_collective_filtered_vec_io
*
- * Purpose: Constructs a MPI derived datatype for both the memory and
- * the file for a collective I/O operation on filtered chunks.
- * The datatype contains the chunk offsets and lengths in the
- * file and the locations of the chunk data buffers to read
- * into/write from.
+ * Purpose: Given a pointer to a H5D_filtered_collective_io_info_t
+ * structure with information about collective filtered chunk
+ * I/O, populates I/O vectors and performs vector I/O on those
+ * chunks.
*
* Return: Non-negative on success/Negative on failure
*
*-------------------------------------------------------------------------
*/
static herr_t
-H5D__mpio_collective_filtered_io_type(H5D_filtered_collective_io_info_t *chunk_list, size_t num_entries,
- H5D_io_op_type_t op_type, MPI_Datatype *new_mem_type,
- hbool_t *mem_type_derived, MPI_Datatype *new_file_type,
- hbool_t *file_type_derived)
+H5D__mpio_collective_filtered_vec_io(const H5D_filtered_collective_io_info_t *chunk_list, H5F_shared_t *f_sh,
+ H5D_io_op_type_t op_type)
{
- MPI_Aint *io_buf_array = NULL; /* Relative displacements of filtered chunk data buffers */
- MPI_Aint *file_offset_array = NULL; /* Chunk offsets in the file */
- int *length_array = NULL; /* Filtered Chunk lengths */
- int mpi_code;
- herr_t ret_value = SUCCEED;
+ const void **io_wbufs = NULL;
+ void **io_rbufs = NULL;
+ H5FD_mem_t io_types[2];
+ uint32_t iovec_count = 0;
+ haddr_t *io_addrs = NULL;
+ size_t *io_sizes = NULL;
+ herr_t ret_value = SUCCEED;
FUNC_ENTER_PACKAGE
- assert(chunk_list || 0 == num_entries);
- assert(new_mem_type);
- assert(mem_type_derived);
- assert(new_file_type);
- assert(file_type_derived);
+ assert(chunk_list);
+ assert(f_sh);
- *mem_type_derived = FALSE;
- *file_type_derived = FALSE;
- *new_mem_type = MPI_BYTE;
- *new_file_type = MPI_BYTE;
+ if (op_type == H5D_IO_OP_WRITE)
+ iovec_count = (uint32_t)chunk_list->num_chunk_infos;
+ else {
+ assert(chunk_list->num_chunks_to_read <= chunk_list->num_chunk_infos);
+ iovec_count = (uint32_t)chunk_list->num_chunks_to_read;
+ }
- if (num_entries > 0) {
- H5F_block_t *chunk_block;
- size_t last_valid_idx = 0;
- size_t i;
- int chunk_count;
+ if (iovec_count > 0) {
+ if (chunk_list->num_chunk_infos > UINT32_MAX)
+ HGOTO_ERROR(H5E_INTERNAL, H5E_BADRANGE, FAIL,
+ "number of chunk entries in I/O operation exceeds UINT32_MAX")
- /*
- * Determine number of chunks for I/O operation and
- * setup for derived datatype creation if I/O operation
- * includes multiple chunks
- */
- if (num_entries == 1) {
- /* Set last valid index to 0 for contiguous datatype creation */
- last_valid_idx = 0;
+ if (NULL == (io_addrs = H5MM_malloc(iovec_count * sizeof(*io_addrs))))
+ HGOTO_ERROR(H5E_RESOURCE, H5E_CANTALLOC, FAIL, "couldn't allocate space for I/O addresses vector")
+ if (NULL == (io_sizes = H5MM_malloc(iovec_count * sizeof(*io_sizes))))
+ HGOTO_ERROR(H5E_RESOURCE, H5E_CANTALLOC, FAIL, "couldn't allocate space for I/O sizes vector")
- if (op_type == H5D_IO_OP_WRITE)
- chunk_count = 1;
- else
- chunk_count = chunk_list[0].need_read ? 1 : 0;
+ if (op_type == H5D_IO_OP_WRITE) {
+ if (NULL == (io_wbufs = H5MM_malloc(iovec_count * sizeof(*io_wbufs))))
+ HGOTO_ERROR(H5E_RESOURCE, H5E_CANTALLOC, FAIL,
+ "couldn't allocate space for I/O buffers vector")
}
else {
- MPI_Aint chunk_buf;
- MPI_Aint base_buf;
- haddr_t base_offset = HADDR_UNDEF;
+ if (NULL == (io_rbufs = H5MM_malloc(iovec_count * sizeof(*io_rbufs))))
+ HGOTO_ERROR(H5E_RESOURCE, H5E_CANTALLOC, FAIL,
+ "couldn't allocate space for I/O buffers vector")
+ }
- H5_CHECK_OVERFLOW(num_entries, size_t, int);
+ /*
+ * Since all I/O will be raw data, we can save on memory a bit by
+ * making use of H5FD_MEM_NOLIST to signal that all the memory types
+ * are the same across the I/O vectors
+ */
+ io_types[0] = H5FD_MEM_DRAW;
+ io_types[1] = H5FD_MEM_NOLIST;
- /* Allocate arrays */
- if (NULL == (length_array = H5MM_malloc((size_t)num_entries * sizeof(int))))
- HGOTO_ERROR(H5E_RESOURCE, H5E_CANTALLOC, FAIL,
- "memory allocation failed for filtered collective I/O length array")
- if (NULL == (io_buf_array = H5MM_malloc((size_t)num_entries * sizeof(MPI_Aint))))
- HGOTO_ERROR(H5E_RESOURCE, H5E_CANTALLOC, FAIL,
- "memory allocation failed for filtered collective I/O buf length array")
- if (NULL == (file_offset_array = H5MM_malloc((size_t)num_entries * sizeof(MPI_Aint))))
- HGOTO_ERROR(H5E_RESOURCE, H5E_CANTALLOC, FAIL,
- "memory allocation failed for filtered collective I/O offset array")
+ for (size_t i = 0, vec_idx = 0; i < chunk_list->num_chunk_infos; i++) {
+ H5F_block_t *chunk_block;
/*
- * If doing a write, we can set the base chunk offset
- * and base chunk data buffer right away.
- *
- * If doing a read, some chunks may be skipped over
- * for reading if they aren't yet allocated in the
- * file. Therefore, we have to find the first chunk
- * actually being read in order to set the base chunk
- * offset and base chunk data buffer.
+ * Check that we aren't going to accidentally try to write past the
+ * allocated memory for the I/O vector buffers in case bookkeeping
+ * wasn't done properly for the chunk list struct's `num_chunks_to_read`
+ * field.
*/
- if (op_type == H5D_IO_OP_WRITE) {
-#if H5_CHECK_MPI_VERSION(3, 0)
- if (MPI_SUCCESS != (mpi_code = MPI_Get_address(chunk_list[0].buf, &base_buf)))
- HMPI_GOTO_ERROR(FAIL, "MPI_Get_address failed", mpi_code)
-#else
- base_buf = (MPI_Aint)chunk_list[0].buf;
-#endif
-
- base_offset = chunk_list[0].chunk_new.offset;
- }
-
- for (i = 0, chunk_count = 0; i < num_entries; i++) {
- if (op_type == H5D_IO_OP_READ) {
- /*
- * If this chunk isn't being read, don't add it
- * to the MPI type we're building up for I/O
- */
- if (!chunk_list[i].need_read)
- continue;
-
- /*
- * If this chunk is being read, go ahead and
- * set the base chunk offset and base chunk
- * data buffer if we haven't already
- */
- if (!H5_addr_defined(base_offset)) {
-#if H5_CHECK_MPI_VERSION(3, 0)
- if (MPI_SUCCESS != (mpi_code = MPI_Get_address(chunk_list[i].buf, &base_buf)))
- HMPI_GOTO_ERROR(FAIL, "MPI_Get_address failed", mpi_code)
-#else
- base_buf = (MPI_Aint)chunk_list[i].buf;
-#endif
-
- base_offset = chunk_list[i].chunk_current.offset;
- }
- }
+ assert(vec_idx < iovec_count);
- /* Set convenience pointer for current chunk block */
- chunk_block =
- (op_type == H5D_IO_OP_READ) ? &chunk_list[i].chunk_current : &chunk_list[i].chunk_new;
-
- /*
- * Set the current chunk entry's offset in the file, relative to
- * the first chunk entry
- */
- assert(H5_addr_defined(chunk_block->offset));
- file_offset_array[chunk_count] = (MPI_Aint)(chunk_block->offset - base_offset);
+ if (op_type == H5D_IO_OP_READ && !chunk_list->chunk_infos[i].need_read)
+ continue;
- /*
- * Ensure the chunk list is sorted in ascending ordering of
- * offset in the file
- */
- if (chunk_count)
- assert(file_offset_array[chunk_count] > file_offset_array[chunk_count - 1]);
+ /* Set convenience pointer for current chunk block */
+ chunk_block = (op_type == H5D_IO_OP_READ) ? &chunk_list->chunk_infos[i].chunk_current
+ : &chunk_list->chunk_infos[i].chunk_new;
- /* Set the current chunk entry's size for the I/O operation */
- H5_CHECK_OVERFLOW(chunk_block->length, hsize_t, int);
- length_array[chunk_count] = (int)chunk_block->length;
+ assert(H5_addr_defined(chunk_block->offset));
+ io_addrs[vec_idx] = chunk_block->offset;
- /*
- * Set the displacement of the chunk entry's chunk data buffer,
- * relative to the first entry's data buffer
- */
-#if H5_CHECK_MPI_VERSION(3, 1)
- if (MPI_SUCCESS != (mpi_code = MPI_Get_address(chunk_list[i].buf, &chunk_buf)))
- HMPI_GOTO_ERROR(FAIL, "MPI_Get_address failed", mpi_code)
-
- io_buf_array[chunk_count] = MPI_Aint_diff(chunk_buf, base_buf);
-#else
- chunk_buf = (MPI_Aint)chunk_list[i].buf;
- io_buf_array[chunk_count] = chunk_buf - base_buf;
+ /*
+ * Ensure the chunk list is sorted in ascending ordering of
+ * offset in the file. Note that we only compare that the
+ * current address is greater than the previous address and
+ * not equal to it; file addresses should only appear in the
+ * chunk list once.
+ */
+#ifndef NDEBUG
+ if (vec_idx > 0)
+ assert(io_addrs[vec_idx] > io_addrs[vec_idx - 1]);
#endif
- /*
- * Set last valid index in case only a single chunk will
- * be involved in the I/O operation
- */
- last_valid_idx = i;
-
- chunk_count++;
- } /* end for */
- }
-
- /*
- * Create derived datatypes for the chunk list if this
- * rank has any chunks to work on
- */
- if (chunk_count > 0) {
- if (chunk_count == 1) {
- int chunk_len;
-
- /* Single chunk - use a contiguous type for both memory and file */
-
- /* Ensure that we can cast chunk size to an int for MPI */
- chunk_block = (op_type == H5D_IO_OP_READ) ? &chunk_list[last_valid_idx].chunk_current
- : &chunk_list[last_valid_idx].chunk_new;
- H5_CHECKED_ASSIGN(chunk_len, int, chunk_block->length, hsize_t);
+ io_sizes[vec_idx] = (size_t)chunk_block->length;
- if (MPI_SUCCESS != (mpi_code = MPI_Type_contiguous(chunk_len, MPI_BYTE, new_file_type)))
- HMPI_GOTO_ERROR(FAIL, "MPI_Type_contiguous failed", mpi_code)
- *new_mem_type = *new_file_type;
-
- /*
- * Since we use the same datatype for both memory and file, only
- * mark the file type as derived so the caller doesn't try to
- * free the same type twice
- */
- *mem_type_derived = FALSE;
- *file_type_derived = TRUE;
+ if (op_type == H5D_IO_OP_WRITE)
+ io_wbufs[vec_idx] = chunk_list->chunk_infos[i].buf;
+ else
+ io_rbufs[vec_idx] = chunk_list->chunk_infos[i].buf;
- if (MPI_SUCCESS != (mpi_code = MPI_Type_commit(new_file_type)))
- HMPI_GOTO_ERROR(FAIL, "MPI_Type_commit failed", mpi_code)
- }
- else {
- assert(file_offset_array);
- assert(length_array);
- assert(io_buf_array);
-
- /* Multiple chunks - use an hindexed type for both memory and file */
-
- /* Create memory MPI type */
- if (MPI_SUCCESS != (mpi_code = MPI_Type_create_hindexed(
- chunk_count, length_array, io_buf_array, MPI_BYTE, new_mem_type)))
- HMPI_GOTO_ERROR(FAIL, "MPI_Type_create_hindexed failed", mpi_code)
- *mem_type_derived = TRUE;
-
- if (MPI_SUCCESS != (mpi_code = MPI_Type_commit(new_mem_type)))
- HMPI_GOTO_ERROR(FAIL, "MPI_Type_commit failed", mpi_code)
-
- /* Create file MPI type */
- if (MPI_SUCCESS !=
- (mpi_code = MPI_Type_create_hindexed(chunk_count, length_array, file_offset_array,
- MPI_BYTE, new_file_type)))
- HMPI_GOTO_ERROR(FAIL, "MPI_Type_create_hindexed failed", mpi_code)
- *file_type_derived = TRUE;
-
- if (MPI_SUCCESS != (mpi_code = MPI_Type_commit(new_file_type)))
- HMPI_GOTO_ERROR(FAIL, "MPI_Type_commit failed", mpi_code)
- }
+ vec_idx++;
}
- } /* end if */
-
-done:
- if (file_offset_array)
- H5MM_free(file_offset_array);
- if (io_buf_array)
- H5MM_free(io_buf_array);
- if (length_array)
- H5MM_free(length_array);
+ }
- if (ret_value < 0) {
- if (*file_type_derived) {
- if (MPI_SUCCESS != (mpi_code = MPI_Type_free(new_file_type)))
- HMPI_DONE_ERROR(FAIL, "MPI_Type_free failed", mpi_code)
- *file_type_derived = FALSE;
- }
- if (*mem_type_derived) {
- if (MPI_SUCCESS != (mpi_code = MPI_Type_free(new_mem_type)))
- HMPI_DONE_ERROR(FAIL, "MPI_Type_free failed", mpi_code)
- *mem_type_derived = FALSE;
- }
+ if (op_type == H5D_IO_OP_WRITE) {
+ if (H5F_shared_vector_write(f_sh, iovec_count, io_types, io_addrs, io_sizes, io_wbufs) < 0)
+ HGOTO_ERROR(H5E_DATASET, H5E_WRITEERROR, FAIL, "vector write call failed")
+ }
+ else {
+ if (H5F_shared_vector_read(f_sh, iovec_count, io_types, io_addrs, io_sizes, io_rbufs) < 0)
+ HGOTO_ERROR(H5E_DATASET, H5E_READERROR, FAIL, "vector read call failed")
}
+done:
+ H5MM_free(io_wbufs);
+ H5MM_free(io_rbufs);
+ H5MM_free(io_sizes);
+ H5MM_free(io_addrs);
+
FUNC_LEAVE_NOAPI(ret_value)
-} /* end H5D__mpio_collective_filtered_io_type() */
+}
#ifdef H5Dmpio_DEBUG
static herr_t
-H5D__mpio_dump_collective_filtered_chunk_list(H5D_filtered_collective_io_info_t *chunk_list,
- size_t chunk_list_num_entries, int mpi_rank)
+H5D__mpio_dump_collective_filtered_chunk_list(H5D_filtered_collective_io_info_t *chunk_list, int mpi_rank)
{
- H5D_filtered_collective_io_info_t *chunk_entry;
- size_t i;
- herr_t ret_value = SUCCEED;
+ H5D_filtered_collective_chunk_info_t *chunk_entry;
+ size_t i;
+ herr_t ret_value = SUCCEED;
FUNC_ENTER_PACKAGE_NOERR
H5D_MPIO_DEBUG(mpi_rank, "CHUNK LIST: [");
- for (i = 0; i < chunk_list_num_entries; i++) {
+ for (i = 0; i < chunk_list->num_chunk_infos; i++) {
unsigned chunk_rank;
- chunk_entry = &chunk_list[i];
+ chunk_entry = &chunk_list->chunk_infos[i];
assert(chunk_entry->chunk_info);
chunk_rank = (unsigned)H5S_GET_EXTENT_NDIMS(chunk_entry->chunk_info->fspace);
diff --git a/src/H5Fio.c b/src/H5Fio.c
index 1924004..09d57e8 100644
--- a/src/H5Fio.c
+++ b/src/H5Fio.c
@@ -328,8 +328,16 @@ H5F_shared_vector_read(H5F_shared_t *f_sh, uint32_t count, H5FD_mem_t types[], h
* for now, assume the caller has done this already.
*/
#ifndef NDEBUG
- for (uint32_t i = 0; i < count; i++)
+ for (uint32_t i = 0; i < count; i++) {
+ /* Break early if H5FD_MEM_NOLIST was specified
+ * since a full 'count'-sized array may not
+ * have been passed for 'types'
+ */
+ if (i > 0 && types[i] == H5FD_MEM_NOLIST)
+ break;
+
assert(types[i] != H5FD_MEM_GHEAP);
+ }
#endif
/* Pass down to file driver layer (bypass page buffer for now) */
@@ -373,8 +381,16 @@ H5F_shared_vector_write(H5F_shared_t *f_sh, uint32_t count, H5FD_mem_t types[],
* for now, assume the caller has done this already.
*/
#ifndef NDEBUG
- for (uint32_t i = 0; i < count; i++)
+ for (uint32_t i = 0; i < count; i++) {
+ /* Break early if H5FD_MEM_NOLIST was specified
+ * since a full 'count'-sized array may not
+ * have been passed for 'types'
+ */
+ if (i > 0 && types[i] == H5FD_MEM_NOLIST)
+ break;
+
assert(types[i] != H5FD_MEM_GHEAP);
+ }
#endif
/* Pass down to file driver layer (bypass page buffer for now) */