diff options
author | Jordan Henderson <jhenderson@hdfgroup.org> | 2017-04-11 20:57:31 (GMT) |
---|---|---|
committer | Jordan Henderson <jhenderson@hdfgroup.org> | 2017-04-11 20:57:31 (GMT) |
commit | cbbd6b0df3fd4f34036a612e5ddb9160df8f5be7 (patch) | |
tree | e0ff90fdca10ad2ca21751d69bdc6257444c1aa1 /src/H5Dmpio.c | |
parent | a22597fab45e6c17138d46552e9483f8541d8118 (diff) | |
download | hdf5-cbbd6b0df3fd4f34036a612e5ddb9160df8f5be7.zip hdf5-cbbd6b0df3fd4f34036a612e5ddb9160df8f5be7.tar.gz hdf5-cbbd6b0df3fd4f34036a612e5ddb9160df8f5be7.tar.bz2 |
Minimize size of H5D_filtered_collective_io_info_t struct
Diffstat (limited to 'src/H5Dmpio.c')
-rw-r--r-- | src/H5Dmpio.c | 306 |
1 files changed, 102 insertions, 204 deletions
diff --git a/src/H5Dmpio.c b/src/H5Dmpio.c index 622cdbb..9ce6c9e 100644 --- a/src/H5Dmpio.c +++ b/src/H5Dmpio.c @@ -95,25 +95,24 @@ typedef struct H5D_chunk_addr_info_t { H5D_chunk_info_t chunk_info; } H5D_chunk_addr_info_t; -/* Information about a single chunk when performing collective filtered IO */ +/* Information about a single chunk when performing collective filtered I/O */ typedef struct H5D_filtered_collective_io_info_t { - H5D_chunk_info_t chunk_info; /* Info about this chunk, such as chunk index and file and memory dataspace */ - H5F_block_t old_chunk; /* The address in the file and size of this chunk before being filtered */ - H5F_block_t new_chunk; /* The address in the file and size of this chunk after being filtered */ - hbool_t full_overwrite; /* Whether or not this chunk is being fully overwritten */ - size_t io_size; /* Size of the I/O to this chunk */ - size_t num_writers; /* Total number of processes writing to this chunk */ + hsize_t index; /* "Index" of chunk in dataset */ + hsize_t scaled[H5O_LAYOUT_NDIMS]; /* Scaled coordinates of chunk (in file dataset's dataspace) */ + hbool_t full_overwrite; /* Whether or not this chunk is being fully overwritten */ + size_t num_writers; /* Total number of processes writing to this chunk */ + size_t io_size; /* Size of the I/O to this chunk */ + void *buf; /* Chunk data to be written to file/that has been read from file*/ struct { - int previous_owner; - int new_owner; - } owners; - -#if 0 - int owner; /* Process which will be writing to this chunk */ -#endif + H5F_block_t chunk_current; /* The address in the file and size of this chunk before the I/O and filtering operations */ + H5F_block_t new_chunk; /* The address in the file and size of this chunk after the I/O and filtering operations */ + } chunk_states; - void *buf; /* Chunk data to be written to file/that has been read from file*/ + struct { + int original_owner; /* The process which originally had this chunk selected in the I/O operation */ + int new_owner; /* The process which the chunk has been re-assigned to */ + } owners; } H5D_filtered_collective_io_info_t; /********************/ @@ -162,7 +161,7 @@ static herr_t H5D__mpio_filtered_collective_write_type( MPI_Datatype *new_mem_type, hbool_t *mem_type_derived, MPI_Datatype *new_file_type, hbool_t *file_type_derived); static herr_t H5D__filtered_collective_chunk_entry_io(H5D_filtered_collective_io_info_t *chunk_entry, - const H5D_io_info_t *io_info, const H5D_type_info_t *type_info); + const H5D_io_info_t *io_info, const H5D_type_info_t *type_info, const H5D_chunk_map_t *fm); static int H5D__cmp_chunk_addr(const void *chunk_addr_info1, const void *chunk_addr_info2); static int H5D__cmp_filtered_collective_io_info_entry(const void *filtered_collective_io_info_entry1, const void *filtered_collective_io_info_entry2); @@ -812,6 +811,8 @@ H5D__chunk_collective_io(H5D_io_info_t *io_info, const H5D_type_info_t *type_inf } #endif +io_option = H5D_MULTI_CHUNK_IO; + /* step 2: Go ahead to do IO.*/ switch (io_option) { case H5D_ONE_LINK_CHUNK_IO: @@ -1363,7 +1364,7 @@ H5D__link_chunk_filtered_collective_io(H5D_io_info_t *io_info, const H5D_type_in */ for (i = 0; i < chunk_list_num_entries; i++) if (mpi_rank == chunk_list[i].owners.new_owner) - if (H5D__filtered_collective_chunk_entry_io(&chunk_list[i], io_info, type_info) < 0) + if (H5D__filtered_collective_chunk_entry_io(&chunk_list[i], io_info, type_info, fm) < 0) HGOTO_ERROR(H5E_DATASET, H5E_WRITEERROR, FAIL, "couldn't process chunk entry") /* Gather the new chunk sizes to all processes for a collective reallocation @@ -1378,8 +1379,8 @@ H5D__link_chunk_filtered_collective_io(H5D_io_info_t *io_info, const H5D_type_in for (i = 0; i < collective_chunk_list_num_entries; i++) { hbool_t insert; - if (H5D__chunk_file_alloc(&index_info, &collective_chunk_list[i].old_chunk, &collective_chunk_list[i].new_chunk, - &insert, collective_chunk_list[i].chunk_info.scaled) < 0) + if (H5D__chunk_file_alloc(&index_info, &collective_chunk_list[i].chunk_states.chunk_current, + &collective_chunk_list[i].chunk_states.new_chunk, &insert, collective_chunk_list[i].scaled) < 0) HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "unable to allocate chunk") } /* end for */ @@ -1438,9 +1439,9 @@ H5D__link_chunk_filtered_collective_io(H5D_io_info_t *io_info, const H5D_type_in * in this iteration into the chunk index */ for (i = 0; i < collective_chunk_list_num_entries; i++) { - udata.chunk_block = collective_chunk_list[i].new_chunk; - udata.common.scaled = collective_chunk_list[i].chunk_info.scaled; - udata.chunk_idx = collective_chunk_list[i].chunk_info.index; + udata.chunk_block = collective_chunk_list[i].chunk_states.new_chunk; + udata.common.scaled = collective_chunk_list[i].scaled; + udata.chunk_idx = collective_chunk_list[i].index; if ((index_info.storage->ops->insert)(&index_info, &udata, io_info->dset) < 0) HGOTO_ERROR(H5E_DATASET, H5E_CANTINSERT, FAIL, "unable to insert chunk address into index") @@ -1795,7 +1796,7 @@ H5D__multi_chunk_filtered_collective_io(H5D_io_info_t *io_info, const H5D_type_i if (io_info->op_type == H5D_IO_OP_READ) { /* Filtered collective read */ for (i = 0; i < chunk_list_num_entries; i++) - if (H5D__filtered_collective_chunk_entry_io(&chunk_list[i], io_info, type_info) < 0) + if (H5D__filtered_collective_chunk_entry_io(&chunk_list[i], io_info, type_info, fm) < 0) HGOTO_ERROR(H5E_DATASET, H5E_READERROR, FAIL, "couldn't process chunk entry") } /* end if */ else { /* Filtered collective write */ @@ -1848,7 +1849,7 @@ H5D__multi_chunk_filtered_collective_io(H5D_io_info_t *io_info, const H5D_type_i hbool_t have_chunk_to_process = (i < chunk_list_num_entries) && (mpi_rank == chunk_list[i].owners.new_owner); if (have_chunk_to_process) - if (H5D__filtered_collective_chunk_entry_io(&chunk_list[i], io_info, type_info) < 0) + if (H5D__filtered_collective_chunk_entry_io(&chunk_list[i], io_info, type_info, fm) < 0) HGOTO_ERROR(H5E_DATASET, H5E_WRITEERROR, FAIL, "couldn't process chunk entry") /* Gather the new chunk sizes to all processes for a collective re-allocation @@ -1865,8 +1866,8 @@ H5D__multi_chunk_filtered_collective_io(H5D_io_info_t *io_info, const H5D_type_i for (j = 0; j < collective_chunk_list_num_entries; j++) { hbool_t insert = FALSE; - if (H5D__chunk_file_alloc(&index_info, &collective_chunk_list[j].old_chunk, &collective_chunk_list[j].new_chunk, - &insert, chunk_list[j].chunk_info.scaled) < 0) + if (H5D__chunk_file_alloc(&index_info, &collective_chunk_list[j].chunk_states.chunk_current, + &collective_chunk_list[j].chunk_states.new_chunk, &insert, chunk_list[j].scaled) < 0) HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "unable to allocate chunk") } /* end for */ @@ -1889,9 +1890,9 @@ H5D__multi_chunk_filtered_collective_io(H5D_io_info_t *io_info, const H5D_type_i /* Collect the new chunk info back to the local copy, since only the record in the * collective array gets updated by the chunk re-allocation */ - HDmemcpy(&chunk_list[i].new_chunk, &collective_chunk_list[offset].new_chunk, sizeof(chunk_list[i].new_chunk)); + HDmemcpy(&chunk_list[i].chunk_states.new_chunk, &collective_chunk_list[offset].chunk_states.new_chunk, sizeof(chunk_list[i].chunk_states.new_chunk)); - H5_CHECKED_ASSIGN(mpi_type_count, int, chunk_list[i].new_chunk.length, hsize_t); + H5_CHECKED_ASSIGN(mpi_type_count, int, chunk_list[i].chunk_states.new_chunk.length, hsize_t); /* Create MPI memory type for writing to chunk */ if (MPI_SUCCESS != (mpi_code = MPI_Type_contiguous(mpi_type_count, MPI_BYTE, &mem_type_array[i]))) @@ -1910,7 +1911,7 @@ H5D__multi_chunk_filtered_collective_io(H5D_io_info_t *io_info, const H5D_type_i mpi_buf_count = 1; /* Set up the base storage address for this operation */ - ctg_store.contig.dset_addr = chunk_list[i].new_chunk.offset; + ctg_store.contig.dset_addr = chunk_list[i].chunk_states.new_chunk.offset; /* Override the write buffer to point to the address of the * chunk data buffer @@ -1930,9 +1931,9 @@ H5D__multi_chunk_filtered_collective_io(H5D_io_info_t *io_info, const H5D_type_i * in this iteration into the chunk index */ for (j = 0; j < collective_chunk_list_num_entries; j++) { - udata.chunk_block = collective_chunk_list[j].new_chunk; - udata.common.scaled = collective_chunk_list[j].chunk_info.scaled; - udata.chunk_idx = collective_chunk_list[j].chunk_info.index; + udata.chunk_block = collective_chunk_list[j].chunk_states.new_chunk; + udata.common.scaled = collective_chunk_list[j].scaled; + udata.chunk_idx = collective_chunk_list[j].index; if ((index_info.storage->ops->insert)(&index_info, &udata, io_info->dset) < 0) HGOTO_ERROR(H5E_DATASET, H5E_CANTINSERT, FAIL, "unable to insert chunk address into index") @@ -2185,8 +2186,8 @@ H5D__cmp_filtered_collective_io_info_entry(const void *filtered_collective_io_in FUNC_ENTER_STATIC_NOERR - addr1 = ((const H5D_filtered_collective_io_info_t *) filtered_collective_io_info_entry1)->new_chunk.offset; - addr2 = ((const H5D_filtered_collective_io_info_t *) filtered_collective_io_info_entry2)->new_chunk.offset; + addr1 = ((const H5D_filtered_collective_io_info_t *) filtered_collective_io_info_entry1)->chunk_states.new_chunk.offset; + addr2 = ((const H5D_filtered_collective_io_info_t *) filtered_collective_io_info_entry2)->chunk_states.new_chunk.offset; FUNC_LEAVE_NOAPI(H5F_addr_cmp(addr1, addr2)) } /* end H5D__cmp_filtered_collective_io_info_entry() */ @@ -2198,8 +2199,8 @@ H5D__cmp_filtered_collective_io_info_entry_owner(const void *filtered_collective FUNC_ENTER_STATIC_NOERR - owner1 = ((const H5D_filtered_collective_io_info_t *) filtered_collective_io_info_entry1)->owners.previous_owner; - owner2 = ((const H5D_filtered_collective_io_info_t *) filtered_collective_io_info_entry2)->owners.previous_owner; + owner1 = ((const H5D_filtered_collective_io_info_t *) filtered_collective_io_info_entry1)->owners.original_owner; + owner2 = ((const H5D_filtered_collective_io_info_t *) filtered_collective_io_info_entry2)->owners.original_owner; FUNC_LEAVE_NOAPI(owner1 - owner2) } /* end H5D__cmp_filtered_collective_io_info_entry_owner() */ @@ -2584,7 +2585,7 @@ static herr_t H5D__construct_filtered_io_info_list(const H5D_io_info_t *io_info, const H5D_type_info_t *type_info, const H5D_chunk_map_t *fm, H5D_filtered_collective_io_info_t **chunk_list, size_t *num_entries) { - H5D_filtered_collective_io_info_t *local_info_array = NULL; /* The list of initially select chunks for this process */ + H5D_filtered_collective_io_info_t *local_info_array = NULL; /* The list of initially selected chunks for this process */ H5D_filtered_collective_io_info_t *shared_chunks_info_array = NULL; /* The list of all chunks selected in the operation by all processes */ H5S_sel_iter_t *mem_iter = NULL; /* Memory iterator for H5D__gather_mem */ unsigned char *mod_data = NULL; /* Chunk modification data sent by a process to a chunk's owner */ @@ -2631,11 +2632,13 @@ H5D__construct_filtered_io_info_list(const H5D_io_info_t *io_info, const H5D_typ if (H5D__chunk_lookup(io_info->dset, io_info->md_dxpl_id, chunk_info->scaled, &udata) < 0) HGOTO_ERROR(H5E_DATASET, H5E_CANTGET, FAIL, "error looking up chunk address") - local_info_array[i].chunk_info = *chunk_info; - local_info_array[i].old_chunk = local_info_array[i].new_chunk = udata.chunk_block; + local_info_array[i].index = chunk_info->index; + local_info_array[i].chunk_states.chunk_current = local_info_array[i].chunk_states.new_chunk = udata.chunk_block; local_info_array[i].num_writers = 0; - local_info_array[i].owners.previous_owner = local_info_array[i].owners.new_owner = mpi_rank; + local_info_array[i].owners.original_owner = local_info_array[i].owners.new_owner = mpi_rank; local_info_array[i].buf = NULL; + + HDmemcpy(local_info_array[i].scaled, chunk_info->scaled, sizeof(chunk_info->scaled)); if ((select_npoints = H5S_GET_SELECT_NPOINTS(chunk_info->mspace)) < 0) HGOTO_ERROR(H5E_DATASET, H5E_CANTCOUNT, FAIL, "dataspace is invalid") @@ -2667,128 +2670,6 @@ H5D__construct_filtered_io_info_list(const H5D_io_info_t *io_info, const H5D_typ false, 0, io_info->comm, H5D__cmp_filtered_collective_io_info_entry) < 0) HGOTO_ERROR(H5E_DATASET, H5E_CANTGATHER, FAIL, "couldn't gather array") -#if 0 - for (i = 0, num_chunks_selected = 0; i < shared_chunks_info_array_num_entries;) { - H5D_filtered_collective_io_info_t chunk_entry; - haddr_t chunk_addr = shared_chunks_info_array[i].old_chunk.offset; - size_t total_io_size = 0; - size_t num_writers = 0; - size_t max_bytes = 0; - int new_owner = 0; - - /* Set the chunk entry's file dataspace to NULL as a sentinel value. - * Any process which is contributing modifications to this chunk will - * obtain a valid file space while processing duplicates below. Any - * process which still has a NULL file space after processing all of - * the duplicate entries for a shared chunk are assumed to not be - * contributing to the chunk and so will not try to access an invalid - * dataspace when processes are sending chunk data to new owners */ - chunk_entry.chunk_info.fspace = NULL; - - /* Process duplicate entries caused by another process writing - * to the same chunk - */ - do { - /* Store the correct chunk entry information in case this process - * becomes the new chunk's owner. The chunk entry that this process - * contributed will be the only one with a valid dataspace selection - * on that particular process - */ - if (mpi_rank == shared_chunks_info_array[i].owner) - chunk_entry = shared_chunks_info_array[i]; - - /* Add this chunk entry's IO size to the running total */ - total_io_size += shared_chunks_info_array[i].io_size; - - /* New owner of the chunk is determined by the process - * which is writing the most data to the chunk - */ - if (shared_chunks_info_array[i].io_size > max_bytes) { - max_bytes = shared_chunks_info_array[i].io_size; - new_owner = shared_chunks_info_array[i].owner; - } - - num_writers++; - } while (++i < shared_chunks_info_array_num_entries && shared_chunks_info_array[i].old_chunk.offset == chunk_addr); - - if (mpi_rank == new_owner) { - hssize_t chunk_npoints; - - /* Make sure the new owner will know how many other processes will - * be sending chunk modification data to it - */ - chunk_entry.num_writers = num_writers; - - /* Set the full chunk overwrite status. It is assumed that this is a full - * overwrite of the chunk if the total IO size is equal to the size of the - * chunk. If the IO size is greater than the size of the chunk, there is an - * overlapping write between processes, meaning there is no guarantee on - * the integrity of data in the write operation. However, this still - * represents a full overwrite of the chunk. - */ - if ((chunk_npoints = H5S_GET_EXTENT_NPOINTS(chunk_entry.chunk_info.fspace)) < 0) - HGOTO_ERROR(H5E_DATASET, H5E_CANTCOUNT, FAIL, "dataspace is invalid") - - chunk_entry.full_overwrite = (total_io_size >= (hsize_t) chunk_npoints * type_info->dst_type_size) ? TRUE : FALSE; - - /* New owner takes possession of the chunk */ - shared_chunks_info_array[num_chunks_selected++] = chunk_entry; - } /* end if */ - else if (chunk_entry.chunk_info.fspace) { - unsigned char *mod_data_p = NULL; /* Use second pointer since H5S_encode advances pointer */ - hssize_t iter_nelmts; /* Number of points to iterate over for the send operation */ - size_t mod_data_size; - - /* Not the new owner of this chunk, encode the file space selection and - * modification data into a buffer and send it to the new chunk owner */ - - /* Determine size of serialized chunk memory dataspace plus the size - * of the data being written - */ - if (H5S_encode(chunk_entry.chunk_info.fspace, &mod_data, &mod_data_size) < 0) - HGOTO_ERROR(H5E_DATASET, H5E_CANTENCODE, FAIL, "unable to get encoded dataspace size") - - if ((iter_nelmts = H5S_GET_SELECT_NPOINTS(chunk_entry.chunk_info.mspace)) < 0) - HGOTO_ERROR(H5E_DATASET, H5E_CANTCOUNT, FAIL, "dataspace is invalid") - - mod_data_size += (size_t) iter_nelmts * type_info->src_type_size; - - if (NULL == (mod_data = (unsigned char *) H5MM_malloc(mod_data_size))) - HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate chunk modification send buffer") - - /* Serialize the chunk's file dataspace into the buffer */ - mod_data_p = mod_data; - if (H5S_encode(chunk_entry.chunk_info.fspace, &mod_data_p, &mod_data_size) < 0) - HGOTO_ERROR(H5E_DATASET, H5E_CANTENCODE, FAIL, "unable to encode dataspace") - - /* Initialize iterator for memory selection */ - if (H5S_select_iter_init(mem_iter, chunk_entry.chunk_info.mspace, type_info->src_type_size) < 0) - HGOTO_ERROR(H5E_DATASET, H5E_CANTINIT, FAIL, "unable to initialize memory selection information") - mem_iter_init = TRUE; - - /* Collect the modification data into the buffer */ - if (!H5D__gather_mem(io_info->u.wbuf, chunk_entry.chunk_info.mspace, mem_iter, - (size_t) iter_nelmts, io_info->dxpl_cache, mod_data_p)) - HGOTO_ERROR(H5E_IO, H5E_WRITEERROR, FAIL, "couldn't gather from write buffer") - - /* Send modification data to new owner */ - H5_CHECK_OVERFLOW(mod_data_size, size_t, int) - H5_CHECK_OVERFLOW(chunk_entry.chunk_info.index, hsize_t, int) - if (MPI_SUCCESS != (mpi_code = MPI_Isend(mod_data, (int) mod_data_size, MPI_BYTE, new_owner, - (int) chunk_entry.chunk_info.index, io_info->comm, &send_requests[num_send_requests++]))) - HMPI_GOTO_ERROR(FAIL, "MPI_Isend failed", mpi_code) - - if (mod_data) { - H5MM_free(mod_data); - mod_data = NULL; - } - if (mem_iter_init && H5S_SELECT_ITER_RELEASE(mem_iter) < 0) - HGOTO_ERROR(H5E_DATASET, H5E_CANTFREE, FAIL, "couldn't release selection iterator") - mem_iter_init = FALSE; - } /* end else */ - } /* end for */ -#endif - /* Rank 0 redistributes any shared chunks to new owners as necessary */ if (mpi_rank == 0) { if (NULL == (send_counts = (int *) H5MM_calloc((size_t) mpi_size * sizeof(*send_counts)))) @@ -2799,7 +2680,7 @@ H5D__construct_filtered_io_info_list(const H5D_io_info_t *io_info, const H5D_typ for (i = 0; i < shared_chunks_info_array_num_entries;) { H5D_filtered_collective_io_info_t chunk_entry; - haddr_t last_seen_addr = shared_chunks_info_array[i].old_chunk.offset; + haddr_t last_seen_addr = shared_chunks_info_array[i].chunk_states.chunk_current.offset; size_t set_begin_index = i; size_t total_io_size = 0; size_t max_io_size = 0; @@ -2810,7 +2691,7 @@ H5D__construct_filtered_io_info_list(const H5D_io_info_t *io_info, const H5D_typ do { chunk_entry = shared_chunks_info_array[i]; - send_counts[chunk_entry.owners.previous_owner] += sizeof(chunk_entry); + send_counts[chunk_entry.owners.original_owner] += sizeof(chunk_entry); /* Add this chunk entry's I/O size to the running total */ total_io_size += chunk_entry.io_size; @@ -2820,11 +2701,11 @@ H5D__construct_filtered_io_info_list(const H5D_io_info_t *io_info, const H5D_typ */ if (chunk_entry.io_size > max_io_size) { max_io_size = chunk_entry.io_size; - new_chunk_owner = chunk_entry.owners.previous_owner; + new_chunk_owner = chunk_entry.owners.original_owner; } num_writers++; - } while (++i < shared_chunks_info_array_num_entries && shared_chunks_info_array[i].old_chunk.offset == last_seen_addr); + } while (++i < shared_chunks_info_array_num_entries && shared_chunks_info_array[i].chunk_states.chunk_current.offset == last_seen_addr); /* Set all of the chunk entries' "new_owner" fields */ for (; set_begin_index < i; set_begin_index++) { @@ -2850,23 +2731,33 @@ H5D__construct_filtered_io_info_list(const H5D_io_info_t *io_info, const H5D_typ MPI_BYTE, 0, io_info->comm))) HMPI_GOTO_ERROR(FAIL, "unable to scatter shared chunks info buffer", mpi_code) + if (shared_chunks_info_array) { + H5MM_free(shared_chunks_info_array); + shared_chunks_info_array = NULL; + } + /* Now that the chunks have been redistributed, each process must send its modification data * to the new owners of any of the chunks it previously possessed */ for (i = 0; i < num_chunks_selected; i++) { if (mpi_rank != local_info_array[i].owners.new_owner) { H5D_filtered_collective_io_info_t chunk_entry = local_info_array[i]; + H5D_chunk_info_t *chunk_info = NULL; unsigned char *mod_data_p = NULL; hssize_t iter_nelmts; size_t mod_data_size; + + /* Look up the chunk and get its file and memory dataspaces */ + if (NULL == (chunk_info = (H5D_chunk_info_t *) H5SL_search(fm->sel_chunks, &chunk_entry.index))) + HGOTO_ERROR(H5E_DATASPACE, H5E_NOTFOUND, FAIL, "can't locate chunk in skip list") /* Determine size of serialized chunk file dataspace, plus the size of * the data being written */ - if (H5S_encode(chunk_entry.chunk_info.fspace, &mod_data, &mod_data_size) < 0) + if (H5S_encode(chunk_info->fspace, &mod_data, &mod_data_size) < 0) HGOTO_ERROR(H5E_DATASET, H5E_CANTENCODE, FAIL, "unable to get encoded dataspace size") - if ((iter_nelmts = H5S_GET_SELECT_NPOINTS(chunk_entry.chunk_info.mspace)) < 0) + if ((iter_nelmts = H5S_GET_SELECT_NPOINTS(chunk_info->mspace)) < 0) HGOTO_ERROR(H5E_DATASET, H5E_CANTCOUNT, FAIL, "dataspace is invalid") mod_data_size += (size_t) iter_nelmts * type_info->src_type_size; @@ -2876,24 +2767,24 @@ H5D__construct_filtered_io_info_list(const H5D_io_info_t *io_info, const H5D_typ /* Serialize the chunk's file dataspace into the buffer */ mod_data_p = mod_data; - if (H5S_encode(chunk_entry.chunk_info.fspace, &mod_data_p, &mod_data_size) < 0) + if (H5S_encode(chunk_info->fspace, &mod_data_p, &mod_data_size) < 0) HGOTO_ERROR(H5E_DATASET, H5E_CANTENCODE, FAIL, "unable to encode dataspace") /* Intialize iterator for memory selection */ - if (H5S_select_iter_init(mem_iter, chunk_entry.chunk_info.mspace, type_info->src_type_size) < 0) + if (H5S_select_iter_init(mem_iter, chunk_info->mspace, type_info->src_type_size) < 0) HGOTO_ERROR(H5E_DATASET, H5E_CANTINIT, FAIL, "unable to initialize memory selection information") mem_iter_init = TRUE; /* Collect the modification data into the buffer */ - if (!H5D__gather_mem(io_info->u.wbuf, chunk_entry.chunk_info.mspace, mem_iter, + if (!H5D__gather_mem(io_info->u.wbuf, chunk_info->mspace, mem_iter, (size_t) iter_nelmts, io_info->dxpl_cache, mod_data_p)) HGOTO_ERROR(H5E_IO, H5E_CANTGATHER, FAIL, "couldn't gather from write buffer") /* Send modification data to new owner */ H5_CHECK_OVERFLOW(mod_data_size, size_t, int) - H5_CHECK_OVERFLOW(chunk_entry.chunk_info.index, hsize_t, int) + H5_CHECK_OVERFLOW(chunk_entry.index, hsize_t, int) if (MPI_SUCCESS != (mpi_code = MPI_Isend(mod_data, (int) mod_data_size, MPI_BYTE, chunk_entry.owners.new_owner, - (int) chunk_entry.chunk_info.index, io_info->comm, &send_requests[num_send_requests++]))) + (int) chunk_entry.index, io_info->comm, &send_requests[num_send_requests++]))) HMPI_GOTO_ERROR(FAIL, "MPI_Isend failed", mpi_code) if (mem_iter_init && H5S_SELECT_ITER_RELEASE(mem_iter) < 0) @@ -2922,6 +2813,8 @@ H5D__construct_filtered_io_info_list(const H5D_io_info_t *io_info, const H5D_typ *num_entries = num_chunks_selected; done: + if (shared_chunks_info_array) + H5MM_free(shared_chunks_info_array); if (send_counts) H5MM_free(send_counts); if (send_displacements) @@ -2994,12 +2887,12 @@ H5D__mpio_filtered_collective_write_type(H5D_filtered_collective_io_info_t *chun base_buf = chunk_list[0].buf; for (i = 0; i < num_entries; i++) { - if (chunk_list[i].owners.previous_owner == chunk_list[i].owners.new_owner) { + if (chunk_list[i].owners.original_owner == chunk_list[i].owners.new_owner) { /* Set up the offset in the file, the length of the chunk data, and the relative * displacement of the chunk data write buffer */ - file_offset_array[i] = (MPI_Aint) chunk_list[i].new_chunk.offset; - length_array[i] = (int) chunk_list[i].new_chunk.length; + file_offset_array[i] = (MPI_Aint) chunk_list[i].chunk_states.new_chunk.offset; + length_array[i] = (int) chunk_list[i].chunk_states.new_chunk.length; write_buf_array[i] = (MPI_Aint) chunk_list[i].buf - (MPI_Aint) base_buf; } } /* end for */ @@ -3048,19 +2941,20 @@ done: */ static herr_t H5D__filtered_collective_chunk_entry_io(H5D_filtered_collective_io_info_t *chunk_entry, - const H5D_io_info_t *io_info, const H5D_type_info_t *type_info) + const H5D_io_info_t *io_info, const H5D_type_info_t *type_info, const H5D_chunk_map_t *fm) { - H5S_sel_iter_t *mem_iter = NULL; /* Memory iterator for H5D__scatter_mem/H5D__gather_mem */ - unsigned char *mod_data = NULL; /* Chunk modification data sent by a process to a chunk's owner */ - unsigned filter_mask = 0; - hssize_t iter_nelmts; /* Number of points to iterate over for the chunk IO operation */ - hbool_t mem_iter_init = FALSE; - size_t buf_size; - size_t mod_data_alloced_bytes = 0; - H5S_t *dataspace = NULL; /* Other process' dataspace for the chunk */ - void *tmp_gath_buf = NULL; /* Temporary gather buffer for owner of the chunk to gather into from + H5D_chunk_info_t *chunk_info = NULL; + H5S_sel_iter_t *mem_iter = NULL; /* Memory iterator for H5D__scatter_mem/H5D__gather_mem */ + unsigned char *mod_data = NULL; /* Chunk modification data sent by a process to a chunk's owner */ + unsigned filter_mask = 0; + hssize_t iter_nelmts; /* Number of points to iterate over for the chunk IO operation */ + hbool_t mem_iter_init = FALSE; + size_t buf_size; + size_t mod_data_alloced_bytes = 0; + H5S_t *dataspace = NULL; /* Other process' dataspace for the chunk */ + void *tmp_gath_buf = NULL; /* Temporary gather buffer for owner of the chunk to gather into from application write buffer before scattering out to the chunk data buffer */ - herr_t ret_value = SUCCEED; + herr_t ret_value = SUCCEED; FUNC_ENTER_STATIC @@ -3068,24 +2962,28 @@ H5D__filtered_collective_chunk_entry_io(H5D_filtered_collective_io_info_t *chunk HDassert(io_info); HDassert(type_info); + /* Look up the chunk and get its file and memory dataspaces */ + if (NULL == (chunk_info = (H5D_chunk_info_t *) H5SL_search(fm->sel_chunks, &chunk_entry->index))) + HGOTO_ERROR(H5E_DATASPACE, H5E_NOTFOUND, FAIL, "can't locate chunk in skip list") + /* If this is a read operation or a write operation where the chunk is not being fully * overwritten, enough memory must be allocated to read the filtered chunk from the file. * If this is a write operation where the chunk is being fully overwritten, enough memory * must be allocated for the size of the unfiltered chunk. */ if (!chunk_entry->full_overwrite || io_info->op_type == H5D_IO_OP_READ) { - buf_size = chunk_entry->old_chunk.length; + buf_size = chunk_entry->chunk_states.chunk_current.length; } else { hssize_t extent_npoints; - if ((extent_npoints = H5S_GET_EXTENT_NPOINTS(chunk_entry->chunk_info.fspace)) < 0) + if ((extent_npoints = H5S_GET_EXTENT_NPOINTS(chunk_info->fspace)) < 0) HGOTO_ERROR(H5E_DATASET, H5E_CANTCOUNT, FAIL, "dataspace is invalid") buf_size = (hsize_t) extent_npoints * type_info->src_type_size; } - chunk_entry->new_chunk.length = buf_size; + chunk_entry->chunk_states.new_chunk.length = buf_size; if (NULL == (chunk_entry->buf = H5MM_malloc(buf_size))) HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate chunk data buffer") @@ -3095,13 +2993,13 @@ H5D__filtered_collective_chunk_entry_io(H5D_filtered_collective_io_info_t *chunk */ if (!chunk_entry->full_overwrite || io_info->op_type == H5D_IO_OP_READ) { /* XXX: Test with MPI types and collective read to improve performance */ - if (H5F_block_read(io_info->dset->oloc.file, H5FD_MEM_DRAW, chunk_entry->old_chunk.offset, + if (H5F_block_read(io_info->dset->oloc.file, H5FD_MEM_DRAW, chunk_entry->chunk_states.chunk_current.offset, buf_size, H5AC_rawdata_dxpl_id, chunk_entry->buf) < 0) HGOTO_ERROR(H5E_IO, H5E_READERROR, FAIL, "unable to read raw data chunk") if (H5Z_pipeline(&io_info->dset->shared->dcpl_cache.pline, H5Z_FLAG_REVERSE, &filter_mask, io_info->dxpl_cache->err_detect, io_info->dxpl_cache->filter_cb, - (size_t *) &chunk_entry->new_chunk.length, &buf_size, &chunk_entry->buf) < 0) + (size_t *) &chunk_entry->chunk_states.new_chunk.length, &buf_size, &chunk_entry->buf) < 0) HGOTO_ERROR(H5E_PLINE, H5E_CANTFILTER, FAIL, "couldn't unfilter chunk for modifying") } /* end if */ @@ -3109,11 +3007,11 @@ H5D__filtered_collective_chunk_entry_io(H5D_filtered_collective_io_info_t *chunk if (NULL == (mem_iter = (H5S_sel_iter_t *) H5MM_malloc(sizeof(*mem_iter)))) HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate memory iterator") - if (H5S_select_iter_init(mem_iter, chunk_entry->chunk_info.mspace, type_info->src_type_size) < 0) + if (H5S_select_iter_init(mem_iter, chunk_info->mspace, type_info->src_type_size) < 0) HGOTO_ERROR(H5E_DATASET, H5E_CANTINIT, FAIL, "unable to initialize memory selection information") mem_iter_init = TRUE; - if ((iter_nelmts = H5S_GET_SELECT_NPOINTS(chunk_entry->chunk_info.mspace)) < 0) + if ((iter_nelmts = H5S_GET_SELECT_NPOINTS(chunk_info->mspace)) < 0) HGOTO_ERROR(H5E_DATASET, H5E_CANTCOUNT, FAIL, "dataspace is invalid") /* If this is a read operation, scatter the read chunk data to the user's buffer. @@ -3124,7 +3022,7 @@ H5D__filtered_collective_chunk_entry_io(H5D_filtered_collective_io_info_t *chunk */ switch (io_info->op_type) { case H5D_IO_OP_READ: - if (H5D__scatter_mem(chunk_entry->buf, chunk_entry->chunk_info.mspace, mem_iter, + if (H5D__scatter_mem(chunk_entry->buf, chunk_info->mspace, mem_iter, (size_t) iter_nelmts, io_info->dxpl_cache, io_info->u.rbuf) < 0) HGOTO_ERROR(H5E_DATASET, H5E_READERROR, FAIL, "couldn't scatter to read buffer") break; @@ -3134,7 +3032,7 @@ H5D__filtered_collective_chunk_entry_io(H5D_filtered_collective_io_info_t *chunk HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate temporary gather buffer") /* Gather modification data from the application write buffer into a temporary buffer */ - if (!H5D__gather_mem(io_info->u.wbuf, chunk_entry->chunk_info.mspace, mem_iter, + if (!H5D__gather_mem(io_info->u.wbuf, chunk_info->mspace, mem_iter, (size_t) iter_nelmts, io_info->dxpl_cache, tmp_gath_buf)) HGOTO_ERROR(H5E_IO, H5E_WRITEERROR, FAIL, "couldn't gather from write buffer") @@ -3143,17 +3041,17 @@ H5D__filtered_collective_chunk_entry_io(H5D_filtered_collective_io_info_t *chunk mem_iter_init = FALSE; /* Initialize iterator for file selection */ - if (H5S_select_iter_init(mem_iter, chunk_entry->chunk_info.fspace, type_info->dst_type_size) < 0) + if (H5S_select_iter_init(mem_iter, chunk_info->fspace, type_info->dst_type_size) < 0) HGOTO_ERROR(H5E_DATASET, H5E_CANTINIT, FAIL, "unable to initialize file selection information") mem_iter_init = TRUE; - if ((iter_nelmts = H5S_GET_SELECT_NPOINTS(chunk_entry->chunk_info.fspace)) < 0) + if ((iter_nelmts = H5S_GET_SELECT_NPOINTS(chunk_info->fspace)) < 0) HGOTO_ERROR(H5E_DATASET, H5E_CANTCOUNT, FAIL, "dataspace is invalid") /* Scatter the owner's modification data into the chunk data buffer according to * the file space. */ - if (H5D__scatter_mem(tmp_gath_buf, chunk_entry->chunk_info.fspace, mem_iter, + if (H5D__scatter_mem(tmp_gath_buf, chunk_info->fspace, mem_iter, (size_t) iter_nelmts, io_info->dxpl_cache, chunk_entry->buf) < 0) HGOTO_ERROR(H5E_DATASET, H5E_READERROR, FAIL, "couldn't scatter to chunk data buffer") @@ -3169,8 +3067,8 @@ H5D__filtered_collective_chunk_entry_io(H5D_filtered_collective_io_info_t *chunk int mpi_code; /* Probe for the incoming message from another process */ - H5_CHECK_OVERFLOW(chunk_entry->chunk_info.index, hsize_t, int) - if (MPI_SUCCESS != (mpi_code = MPI_Probe(MPI_ANY_SOURCE, (int) chunk_entry->chunk_info.index, + H5_CHECK_OVERFLOW(chunk_entry->index, hsize_t, int) + if (MPI_SUCCESS != (mpi_code = MPI_Probe(MPI_ANY_SOURCE, (int) chunk_entry->index, io_info->comm, &status))) HMPI_GOTO_ERROR(FAIL, "MPI_Probe failed", mpi_code) @@ -3186,7 +3084,7 @@ H5D__filtered_collective_chunk_entry_io(H5D_filtered_collective_io_info_t *chunk } if (MPI_SUCCESS != (mpi_code = MPI_Recv(mod_data, count, MPI_BYTE, MPI_ANY_SOURCE, - (int) chunk_entry->chunk_info.index, io_info->comm, &status))) + (int) chunk_entry->index, io_info->comm, &status))) HMPI_GOTO_ERROR(FAIL, "MPI_Recv failed", mpi_code) /* Decode the process' chunk file dataspace */ @@ -3221,12 +3119,12 @@ H5D__filtered_collective_chunk_entry_io(H5D_filtered_collective_io_info_t *chunk /* Filter the chunk */ if (H5Z_pipeline(&io_info->dset->shared->dcpl_cache.pline, 0, &filter_mask, io_info->dxpl_cache->err_detect, io_info->dxpl_cache->filter_cb, - (size_t *) &chunk_entry->new_chunk.length, &buf_size, &chunk_entry->buf) < 0) + (size_t *) &chunk_entry->chunk_states.new_chunk.length, &buf_size, &chunk_entry->buf) < 0) HGOTO_ERROR(H5E_PLINE, H5E_CANTFILTER, H5_ITER_ERROR, "output pipeline failed") #if H5_SIZEOF_SIZE_T > 4 /* Check for the chunk expanding too much to encode in a 32-bit value */ - if (chunk_entry->new_chunk.length > ((size_t) 0xffffffff)) + if (chunk_entry->chunk_states.new_chunk.length > ((size_t) 0xffffffff)) HGOTO_ERROR(H5E_DATASET, H5E_BADRANGE, FAIL, "chunk too large for 32-bit length") #endif |