summaryrefslogtreecommitdiffstats
path: root/src/H5Dmpio.c
diff options
context:
space:
mode:
authorJordan Henderson <jhenderson@hdfgroup.org>2017-04-11 20:57:31 (GMT)
committerJordan Henderson <jhenderson@hdfgroup.org>2017-04-11 20:57:31 (GMT)
commitcbbd6b0df3fd4f34036a612e5ddb9160df8f5be7 (patch)
treee0ff90fdca10ad2ca21751d69bdc6257444c1aa1 /src/H5Dmpio.c
parenta22597fab45e6c17138d46552e9483f8541d8118 (diff)
downloadhdf5-cbbd6b0df3fd4f34036a612e5ddb9160df8f5be7.zip
hdf5-cbbd6b0df3fd4f34036a612e5ddb9160df8f5be7.tar.gz
hdf5-cbbd6b0df3fd4f34036a612e5ddb9160df8f5be7.tar.bz2
Minimize size of H5D_filtered_collective_io_info_t struct
Diffstat (limited to 'src/H5Dmpio.c')
-rw-r--r--src/H5Dmpio.c306
1 files changed, 102 insertions, 204 deletions
diff --git a/src/H5Dmpio.c b/src/H5Dmpio.c
index 622cdbb..9ce6c9e 100644
--- a/src/H5Dmpio.c
+++ b/src/H5Dmpio.c
@@ -95,25 +95,24 @@ typedef struct H5D_chunk_addr_info_t {
H5D_chunk_info_t chunk_info;
} H5D_chunk_addr_info_t;
-/* Information about a single chunk when performing collective filtered IO */
+/* Information about a single chunk when performing collective filtered I/O */
typedef struct H5D_filtered_collective_io_info_t {
- H5D_chunk_info_t chunk_info; /* Info about this chunk, such as chunk index and file and memory dataspace */
- H5F_block_t old_chunk; /* The address in the file and size of this chunk before being filtered */
- H5F_block_t new_chunk; /* The address in the file and size of this chunk after being filtered */
- hbool_t full_overwrite; /* Whether or not this chunk is being fully overwritten */
- size_t io_size; /* Size of the I/O to this chunk */
- size_t num_writers; /* Total number of processes writing to this chunk */
+ hsize_t index; /* "Index" of chunk in dataset */
+ hsize_t scaled[H5O_LAYOUT_NDIMS]; /* Scaled coordinates of chunk (in file dataset's dataspace) */
+ hbool_t full_overwrite; /* Whether or not this chunk is being fully overwritten */
+ size_t num_writers; /* Total number of processes writing to this chunk */
+ size_t io_size; /* Size of the I/O to this chunk */
+ void *buf; /* Chunk data to be written to file/that has been read from file*/
struct {
- int previous_owner;
- int new_owner;
- } owners;
-
-#if 0
- int owner; /* Process which will be writing to this chunk */
-#endif
+ H5F_block_t chunk_current; /* The address in the file and size of this chunk before the I/O and filtering operations */
+ H5F_block_t new_chunk; /* The address in the file and size of this chunk after the I/O and filtering operations */
+ } chunk_states;
- void *buf; /* Chunk data to be written to file/that has been read from file*/
+ struct {
+ int original_owner; /* The process which originally had this chunk selected in the I/O operation */
+ int new_owner; /* The process which the chunk has been re-assigned to */
+ } owners;
} H5D_filtered_collective_io_info_t;
/********************/
@@ -162,7 +161,7 @@ static herr_t H5D__mpio_filtered_collective_write_type(
MPI_Datatype *new_mem_type, hbool_t *mem_type_derived,
MPI_Datatype *new_file_type, hbool_t *file_type_derived);
static herr_t H5D__filtered_collective_chunk_entry_io(H5D_filtered_collective_io_info_t *chunk_entry,
- const H5D_io_info_t *io_info, const H5D_type_info_t *type_info);
+ const H5D_io_info_t *io_info, const H5D_type_info_t *type_info, const H5D_chunk_map_t *fm);
static int H5D__cmp_chunk_addr(const void *chunk_addr_info1, const void *chunk_addr_info2);
static int H5D__cmp_filtered_collective_io_info_entry(const void *filtered_collective_io_info_entry1,
const void *filtered_collective_io_info_entry2);
@@ -812,6 +811,8 @@ H5D__chunk_collective_io(H5D_io_info_t *io_info, const H5D_type_info_t *type_inf
}
#endif
+io_option = H5D_MULTI_CHUNK_IO;
+
/* step 2: Go ahead to do IO.*/
switch (io_option) {
case H5D_ONE_LINK_CHUNK_IO:
@@ -1363,7 +1364,7 @@ H5D__link_chunk_filtered_collective_io(H5D_io_info_t *io_info, const H5D_type_in
*/
for (i = 0; i < chunk_list_num_entries; i++)
if (mpi_rank == chunk_list[i].owners.new_owner)
- if (H5D__filtered_collective_chunk_entry_io(&chunk_list[i], io_info, type_info) < 0)
+ if (H5D__filtered_collective_chunk_entry_io(&chunk_list[i], io_info, type_info, fm) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_WRITEERROR, FAIL, "couldn't process chunk entry")
/* Gather the new chunk sizes to all processes for a collective reallocation
@@ -1378,8 +1379,8 @@ H5D__link_chunk_filtered_collective_io(H5D_io_info_t *io_info, const H5D_type_in
for (i = 0; i < collective_chunk_list_num_entries; i++) {
hbool_t insert;
- if (H5D__chunk_file_alloc(&index_info, &collective_chunk_list[i].old_chunk, &collective_chunk_list[i].new_chunk,
- &insert, collective_chunk_list[i].chunk_info.scaled) < 0)
+ if (H5D__chunk_file_alloc(&index_info, &collective_chunk_list[i].chunk_states.chunk_current,
+ &collective_chunk_list[i].chunk_states.new_chunk, &insert, collective_chunk_list[i].scaled) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "unable to allocate chunk")
} /* end for */
@@ -1438,9 +1439,9 @@ H5D__link_chunk_filtered_collective_io(H5D_io_info_t *io_info, const H5D_type_in
* in this iteration into the chunk index
*/
for (i = 0; i < collective_chunk_list_num_entries; i++) {
- udata.chunk_block = collective_chunk_list[i].new_chunk;
- udata.common.scaled = collective_chunk_list[i].chunk_info.scaled;
- udata.chunk_idx = collective_chunk_list[i].chunk_info.index;
+ udata.chunk_block = collective_chunk_list[i].chunk_states.new_chunk;
+ udata.common.scaled = collective_chunk_list[i].scaled;
+ udata.chunk_idx = collective_chunk_list[i].index;
if ((index_info.storage->ops->insert)(&index_info, &udata, io_info->dset) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_CANTINSERT, FAIL, "unable to insert chunk address into index")
@@ -1795,7 +1796,7 @@ H5D__multi_chunk_filtered_collective_io(H5D_io_info_t *io_info, const H5D_type_i
if (io_info->op_type == H5D_IO_OP_READ) { /* Filtered collective read */
for (i = 0; i < chunk_list_num_entries; i++)
- if (H5D__filtered_collective_chunk_entry_io(&chunk_list[i], io_info, type_info) < 0)
+ if (H5D__filtered_collective_chunk_entry_io(&chunk_list[i], io_info, type_info, fm) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_READERROR, FAIL, "couldn't process chunk entry")
} /* end if */
else { /* Filtered collective write */
@@ -1848,7 +1849,7 @@ H5D__multi_chunk_filtered_collective_io(H5D_io_info_t *io_info, const H5D_type_i
hbool_t have_chunk_to_process = (i < chunk_list_num_entries) && (mpi_rank == chunk_list[i].owners.new_owner);
if (have_chunk_to_process)
- if (H5D__filtered_collective_chunk_entry_io(&chunk_list[i], io_info, type_info) < 0)
+ if (H5D__filtered_collective_chunk_entry_io(&chunk_list[i], io_info, type_info, fm) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_WRITEERROR, FAIL, "couldn't process chunk entry")
/* Gather the new chunk sizes to all processes for a collective re-allocation
@@ -1865,8 +1866,8 @@ H5D__multi_chunk_filtered_collective_io(H5D_io_info_t *io_info, const H5D_type_i
for (j = 0; j < collective_chunk_list_num_entries; j++) {
hbool_t insert = FALSE;
- if (H5D__chunk_file_alloc(&index_info, &collective_chunk_list[j].old_chunk, &collective_chunk_list[j].new_chunk,
- &insert, chunk_list[j].chunk_info.scaled) < 0)
+ if (H5D__chunk_file_alloc(&index_info, &collective_chunk_list[j].chunk_states.chunk_current,
+ &collective_chunk_list[j].chunk_states.new_chunk, &insert, chunk_list[j].scaled) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "unable to allocate chunk")
} /* end for */
@@ -1889,9 +1890,9 @@ H5D__multi_chunk_filtered_collective_io(H5D_io_info_t *io_info, const H5D_type_i
/* Collect the new chunk info back to the local copy, since only the record in the
* collective array gets updated by the chunk re-allocation */
- HDmemcpy(&chunk_list[i].new_chunk, &collective_chunk_list[offset].new_chunk, sizeof(chunk_list[i].new_chunk));
+ HDmemcpy(&chunk_list[i].chunk_states.new_chunk, &collective_chunk_list[offset].chunk_states.new_chunk, sizeof(chunk_list[i].chunk_states.new_chunk));
- H5_CHECKED_ASSIGN(mpi_type_count, int, chunk_list[i].new_chunk.length, hsize_t);
+ H5_CHECKED_ASSIGN(mpi_type_count, int, chunk_list[i].chunk_states.new_chunk.length, hsize_t);
/* Create MPI memory type for writing to chunk */
if (MPI_SUCCESS != (mpi_code = MPI_Type_contiguous(mpi_type_count, MPI_BYTE, &mem_type_array[i])))
@@ -1910,7 +1911,7 @@ H5D__multi_chunk_filtered_collective_io(H5D_io_info_t *io_info, const H5D_type_i
mpi_buf_count = 1;
/* Set up the base storage address for this operation */
- ctg_store.contig.dset_addr = chunk_list[i].new_chunk.offset;
+ ctg_store.contig.dset_addr = chunk_list[i].chunk_states.new_chunk.offset;
/* Override the write buffer to point to the address of the
* chunk data buffer
@@ -1930,9 +1931,9 @@ H5D__multi_chunk_filtered_collective_io(H5D_io_info_t *io_info, const H5D_type_i
* in this iteration into the chunk index
*/
for (j = 0; j < collective_chunk_list_num_entries; j++) {
- udata.chunk_block = collective_chunk_list[j].new_chunk;
- udata.common.scaled = collective_chunk_list[j].chunk_info.scaled;
- udata.chunk_idx = collective_chunk_list[j].chunk_info.index;
+ udata.chunk_block = collective_chunk_list[j].chunk_states.new_chunk;
+ udata.common.scaled = collective_chunk_list[j].scaled;
+ udata.chunk_idx = collective_chunk_list[j].index;
if ((index_info.storage->ops->insert)(&index_info, &udata, io_info->dset) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_CANTINSERT, FAIL, "unable to insert chunk address into index")
@@ -2185,8 +2186,8 @@ H5D__cmp_filtered_collective_io_info_entry(const void *filtered_collective_io_in
FUNC_ENTER_STATIC_NOERR
- addr1 = ((const H5D_filtered_collective_io_info_t *) filtered_collective_io_info_entry1)->new_chunk.offset;
- addr2 = ((const H5D_filtered_collective_io_info_t *) filtered_collective_io_info_entry2)->new_chunk.offset;
+ addr1 = ((const H5D_filtered_collective_io_info_t *) filtered_collective_io_info_entry1)->chunk_states.new_chunk.offset;
+ addr2 = ((const H5D_filtered_collective_io_info_t *) filtered_collective_io_info_entry2)->chunk_states.new_chunk.offset;
FUNC_LEAVE_NOAPI(H5F_addr_cmp(addr1, addr2))
} /* end H5D__cmp_filtered_collective_io_info_entry() */
@@ -2198,8 +2199,8 @@ H5D__cmp_filtered_collective_io_info_entry_owner(const void *filtered_collective
FUNC_ENTER_STATIC_NOERR
- owner1 = ((const H5D_filtered_collective_io_info_t *) filtered_collective_io_info_entry1)->owners.previous_owner;
- owner2 = ((const H5D_filtered_collective_io_info_t *) filtered_collective_io_info_entry2)->owners.previous_owner;
+ owner1 = ((const H5D_filtered_collective_io_info_t *) filtered_collective_io_info_entry1)->owners.original_owner;
+ owner2 = ((const H5D_filtered_collective_io_info_t *) filtered_collective_io_info_entry2)->owners.original_owner;
FUNC_LEAVE_NOAPI(owner1 - owner2)
} /* end H5D__cmp_filtered_collective_io_info_entry_owner() */
@@ -2584,7 +2585,7 @@ static herr_t
H5D__construct_filtered_io_info_list(const H5D_io_info_t *io_info, const H5D_type_info_t *type_info,
const H5D_chunk_map_t *fm, H5D_filtered_collective_io_info_t **chunk_list, size_t *num_entries)
{
- H5D_filtered_collective_io_info_t *local_info_array = NULL; /* The list of initially select chunks for this process */
+ H5D_filtered_collective_io_info_t *local_info_array = NULL; /* The list of initially selected chunks for this process */
H5D_filtered_collective_io_info_t *shared_chunks_info_array = NULL; /* The list of all chunks selected in the operation by all processes */
H5S_sel_iter_t *mem_iter = NULL; /* Memory iterator for H5D__gather_mem */
unsigned char *mod_data = NULL; /* Chunk modification data sent by a process to a chunk's owner */
@@ -2631,11 +2632,13 @@ H5D__construct_filtered_io_info_list(const H5D_io_info_t *io_info, const H5D_typ
if (H5D__chunk_lookup(io_info->dset, io_info->md_dxpl_id, chunk_info->scaled, &udata) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_CANTGET, FAIL, "error looking up chunk address")
- local_info_array[i].chunk_info = *chunk_info;
- local_info_array[i].old_chunk = local_info_array[i].new_chunk = udata.chunk_block;
+ local_info_array[i].index = chunk_info->index;
+ local_info_array[i].chunk_states.chunk_current = local_info_array[i].chunk_states.new_chunk = udata.chunk_block;
local_info_array[i].num_writers = 0;
- local_info_array[i].owners.previous_owner = local_info_array[i].owners.new_owner = mpi_rank;
+ local_info_array[i].owners.original_owner = local_info_array[i].owners.new_owner = mpi_rank;
local_info_array[i].buf = NULL;
+
+ HDmemcpy(local_info_array[i].scaled, chunk_info->scaled, sizeof(chunk_info->scaled));
if ((select_npoints = H5S_GET_SELECT_NPOINTS(chunk_info->mspace)) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_CANTCOUNT, FAIL, "dataspace is invalid")
@@ -2667,128 +2670,6 @@ H5D__construct_filtered_io_info_list(const H5D_io_info_t *io_info, const H5D_typ
false, 0, io_info->comm, H5D__cmp_filtered_collective_io_info_entry) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_CANTGATHER, FAIL, "couldn't gather array")
-#if 0
- for (i = 0, num_chunks_selected = 0; i < shared_chunks_info_array_num_entries;) {
- H5D_filtered_collective_io_info_t chunk_entry;
- haddr_t chunk_addr = shared_chunks_info_array[i].old_chunk.offset;
- size_t total_io_size = 0;
- size_t num_writers = 0;
- size_t max_bytes = 0;
- int new_owner = 0;
-
- /* Set the chunk entry's file dataspace to NULL as a sentinel value.
- * Any process which is contributing modifications to this chunk will
- * obtain a valid file space while processing duplicates below. Any
- * process which still has a NULL file space after processing all of
- * the duplicate entries for a shared chunk are assumed to not be
- * contributing to the chunk and so will not try to access an invalid
- * dataspace when processes are sending chunk data to new owners */
- chunk_entry.chunk_info.fspace = NULL;
-
- /* Process duplicate entries caused by another process writing
- * to the same chunk
- */
- do {
- /* Store the correct chunk entry information in case this process
- * becomes the new chunk's owner. The chunk entry that this process
- * contributed will be the only one with a valid dataspace selection
- * on that particular process
- */
- if (mpi_rank == shared_chunks_info_array[i].owner)
- chunk_entry = shared_chunks_info_array[i];
-
- /* Add this chunk entry's IO size to the running total */
- total_io_size += shared_chunks_info_array[i].io_size;
-
- /* New owner of the chunk is determined by the process
- * which is writing the most data to the chunk
- */
- if (shared_chunks_info_array[i].io_size > max_bytes) {
- max_bytes = shared_chunks_info_array[i].io_size;
- new_owner = shared_chunks_info_array[i].owner;
- }
-
- num_writers++;
- } while (++i < shared_chunks_info_array_num_entries && shared_chunks_info_array[i].old_chunk.offset == chunk_addr);
-
- if (mpi_rank == new_owner) {
- hssize_t chunk_npoints;
-
- /* Make sure the new owner will know how many other processes will
- * be sending chunk modification data to it
- */
- chunk_entry.num_writers = num_writers;
-
- /* Set the full chunk overwrite status. It is assumed that this is a full
- * overwrite of the chunk if the total IO size is equal to the size of the
- * chunk. If the IO size is greater than the size of the chunk, there is an
- * overlapping write between processes, meaning there is no guarantee on
- * the integrity of data in the write operation. However, this still
- * represents a full overwrite of the chunk.
- */
- if ((chunk_npoints = H5S_GET_EXTENT_NPOINTS(chunk_entry.chunk_info.fspace)) < 0)
- HGOTO_ERROR(H5E_DATASET, H5E_CANTCOUNT, FAIL, "dataspace is invalid")
-
- chunk_entry.full_overwrite = (total_io_size >= (hsize_t) chunk_npoints * type_info->dst_type_size) ? TRUE : FALSE;
-
- /* New owner takes possession of the chunk */
- shared_chunks_info_array[num_chunks_selected++] = chunk_entry;
- } /* end if */
- else if (chunk_entry.chunk_info.fspace) {
- unsigned char *mod_data_p = NULL; /* Use second pointer since H5S_encode advances pointer */
- hssize_t iter_nelmts; /* Number of points to iterate over for the send operation */
- size_t mod_data_size;
-
- /* Not the new owner of this chunk, encode the file space selection and
- * modification data into a buffer and send it to the new chunk owner */
-
- /* Determine size of serialized chunk memory dataspace plus the size
- * of the data being written
- */
- if (H5S_encode(chunk_entry.chunk_info.fspace, &mod_data, &mod_data_size) < 0)
- HGOTO_ERROR(H5E_DATASET, H5E_CANTENCODE, FAIL, "unable to get encoded dataspace size")
-
- if ((iter_nelmts = H5S_GET_SELECT_NPOINTS(chunk_entry.chunk_info.mspace)) < 0)
- HGOTO_ERROR(H5E_DATASET, H5E_CANTCOUNT, FAIL, "dataspace is invalid")
-
- mod_data_size += (size_t) iter_nelmts * type_info->src_type_size;
-
- if (NULL == (mod_data = (unsigned char *) H5MM_malloc(mod_data_size)))
- HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate chunk modification send buffer")
-
- /* Serialize the chunk's file dataspace into the buffer */
- mod_data_p = mod_data;
- if (H5S_encode(chunk_entry.chunk_info.fspace, &mod_data_p, &mod_data_size) < 0)
- HGOTO_ERROR(H5E_DATASET, H5E_CANTENCODE, FAIL, "unable to encode dataspace")
-
- /* Initialize iterator for memory selection */
- if (H5S_select_iter_init(mem_iter, chunk_entry.chunk_info.mspace, type_info->src_type_size) < 0)
- HGOTO_ERROR(H5E_DATASET, H5E_CANTINIT, FAIL, "unable to initialize memory selection information")
- mem_iter_init = TRUE;
-
- /* Collect the modification data into the buffer */
- if (!H5D__gather_mem(io_info->u.wbuf, chunk_entry.chunk_info.mspace, mem_iter,
- (size_t) iter_nelmts, io_info->dxpl_cache, mod_data_p))
- HGOTO_ERROR(H5E_IO, H5E_WRITEERROR, FAIL, "couldn't gather from write buffer")
-
- /* Send modification data to new owner */
- H5_CHECK_OVERFLOW(mod_data_size, size_t, int)
- H5_CHECK_OVERFLOW(chunk_entry.chunk_info.index, hsize_t, int)
- if (MPI_SUCCESS != (mpi_code = MPI_Isend(mod_data, (int) mod_data_size, MPI_BYTE, new_owner,
- (int) chunk_entry.chunk_info.index, io_info->comm, &send_requests[num_send_requests++])))
- HMPI_GOTO_ERROR(FAIL, "MPI_Isend failed", mpi_code)
-
- if (mod_data) {
- H5MM_free(mod_data);
- mod_data = NULL;
- }
- if (mem_iter_init && H5S_SELECT_ITER_RELEASE(mem_iter) < 0)
- HGOTO_ERROR(H5E_DATASET, H5E_CANTFREE, FAIL, "couldn't release selection iterator")
- mem_iter_init = FALSE;
- } /* end else */
- } /* end for */
-#endif
-
/* Rank 0 redistributes any shared chunks to new owners as necessary */
if (mpi_rank == 0) {
if (NULL == (send_counts = (int *) H5MM_calloc((size_t) mpi_size * sizeof(*send_counts))))
@@ -2799,7 +2680,7 @@ H5D__construct_filtered_io_info_list(const H5D_io_info_t *io_info, const H5D_typ
for (i = 0; i < shared_chunks_info_array_num_entries;) {
H5D_filtered_collective_io_info_t chunk_entry;
- haddr_t last_seen_addr = shared_chunks_info_array[i].old_chunk.offset;
+ haddr_t last_seen_addr = shared_chunks_info_array[i].chunk_states.chunk_current.offset;
size_t set_begin_index = i;
size_t total_io_size = 0;
size_t max_io_size = 0;
@@ -2810,7 +2691,7 @@ H5D__construct_filtered_io_info_list(const H5D_io_info_t *io_info, const H5D_typ
do {
chunk_entry = shared_chunks_info_array[i];
- send_counts[chunk_entry.owners.previous_owner] += sizeof(chunk_entry);
+ send_counts[chunk_entry.owners.original_owner] += sizeof(chunk_entry);
/* Add this chunk entry's I/O size to the running total */
total_io_size += chunk_entry.io_size;
@@ -2820,11 +2701,11 @@ H5D__construct_filtered_io_info_list(const H5D_io_info_t *io_info, const H5D_typ
*/
if (chunk_entry.io_size > max_io_size) {
max_io_size = chunk_entry.io_size;
- new_chunk_owner = chunk_entry.owners.previous_owner;
+ new_chunk_owner = chunk_entry.owners.original_owner;
}
num_writers++;
- } while (++i < shared_chunks_info_array_num_entries && shared_chunks_info_array[i].old_chunk.offset == last_seen_addr);
+ } while (++i < shared_chunks_info_array_num_entries && shared_chunks_info_array[i].chunk_states.chunk_current.offset == last_seen_addr);
/* Set all of the chunk entries' "new_owner" fields */
for (; set_begin_index < i; set_begin_index++) {
@@ -2850,23 +2731,33 @@ H5D__construct_filtered_io_info_list(const H5D_io_info_t *io_info, const H5D_typ
MPI_BYTE, 0, io_info->comm)))
HMPI_GOTO_ERROR(FAIL, "unable to scatter shared chunks info buffer", mpi_code)
+ if (shared_chunks_info_array) {
+ H5MM_free(shared_chunks_info_array);
+ shared_chunks_info_array = NULL;
+ }
+
/* Now that the chunks have been redistributed, each process must send its modification data
* to the new owners of any of the chunks it previously possessed
*/
for (i = 0; i < num_chunks_selected; i++) {
if (mpi_rank != local_info_array[i].owners.new_owner) {
H5D_filtered_collective_io_info_t chunk_entry = local_info_array[i];
+ H5D_chunk_info_t *chunk_info = NULL;
unsigned char *mod_data_p = NULL;
hssize_t iter_nelmts;
size_t mod_data_size;
+
+ /* Look up the chunk and get its file and memory dataspaces */
+ if (NULL == (chunk_info = (H5D_chunk_info_t *) H5SL_search(fm->sel_chunks, &chunk_entry.index)))
+ HGOTO_ERROR(H5E_DATASPACE, H5E_NOTFOUND, FAIL, "can't locate chunk in skip list")
/* Determine size of serialized chunk file dataspace, plus the size of
* the data being written
*/
- if (H5S_encode(chunk_entry.chunk_info.fspace, &mod_data, &mod_data_size) < 0)
+ if (H5S_encode(chunk_info->fspace, &mod_data, &mod_data_size) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_CANTENCODE, FAIL, "unable to get encoded dataspace size")
- if ((iter_nelmts = H5S_GET_SELECT_NPOINTS(chunk_entry.chunk_info.mspace)) < 0)
+ if ((iter_nelmts = H5S_GET_SELECT_NPOINTS(chunk_info->mspace)) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_CANTCOUNT, FAIL, "dataspace is invalid")
mod_data_size += (size_t) iter_nelmts * type_info->src_type_size;
@@ -2876,24 +2767,24 @@ H5D__construct_filtered_io_info_list(const H5D_io_info_t *io_info, const H5D_typ
/* Serialize the chunk's file dataspace into the buffer */
mod_data_p = mod_data;
- if (H5S_encode(chunk_entry.chunk_info.fspace, &mod_data_p, &mod_data_size) < 0)
+ if (H5S_encode(chunk_info->fspace, &mod_data_p, &mod_data_size) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_CANTENCODE, FAIL, "unable to encode dataspace")
/* Intialize iterator for memory selection */
- if (H5S_select_iter_init(mem_iter, chunk_entry.chunk_info.mspace, type_info->src_type_size) < 0)
+ if (H5S_select_iter_init(mem_iter, chunk_info->mspace, type_info->src_type_size) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_CANTINIT, FAIL, "unable to initialize memory selection information")
mem_iter_init = TRUE;
/* Collect the modification data into the buffer */
- if (!H5D__gather_mem(io_info->u.wbuf, chunk_entry.chunk_info.mspace, mem_iter,
+ if (!H5D__gather_mem(io_info->u.wbuf, chunk_info->mspace, mem_iter,
(size_t) iter_nelmts, io_info->dxpl_cache, mod_data_p))
HGOTO_ERROR(H5E_IO, H5E_CANTGATHER, FAIL, "couldn't gather from write buffer")
/* Send modification data to new owner */
H5_CHECK_OVERFLOW(mod_data_size, size_t, int)
- H5_CHECK_OVERFLOW(chunk_entry.chunk_info.index, hsize_t, int)
+ H5_CHECK_OVERFLOW(chunk_entry.index, hsize_t, int)
if (MPI_SUCCESS != (mpi_code = MPI_Isend(mod_data, (int) mod_data_size, MPI_BYTE, chunk_entry.owners.new_owner,
- (int) chunk_entry.chunk_info.index, io_info->comm, &send_requests[num_send_requests++])))
+ (int) chunk_entry.index, io_info->comm, &send_requests[num_send_requests++])))
HMPI_GOTO_ERROR(FAIL, "MPI_Isend failed", mpi_code)
if (mem_iter_init && H5S_SELECT_ITER_RELEASE(mem_iter) < 0)
@@ -2922,6 +2813,8 @@ H5D__construct_filtered_io_info_list(const H5D_io_info_t *io_info, const H5D_typ
*num_entries = num_chunks_selected;
done:
+ if (shared_chunks_info_array)
+ H5MM_free(shared_chunks_info_array);
if (send_counts)
H5MM_free(send_counts);
if (send_displacements)
@@ -2994,12 +2887,12 @@ H5D__mpio_filtered_collective_write_type(H5D_filtered_collective_io_info_t *chun
base_buf = chunk_list[0].buf;
for (i = 0; i < num_entries; i++) {
- if (chunk_list[i].owners.previous_owner == chunk_list[i].owners.new_owner) {
+ if (chunk_list[i].owners.original_owner == chunk_list[i].owners.new_owner) {
/* Set up the offset in the file, the length of the chunk data, and the relative
* displacement of the chunk data write buffer
*/
- file_offset_array[i] = (MPI_Aint) chunk_list[i].new_chunk.offset;
- length_array[i] = (int) chunk_list[i].new_chunk.length;
+ file_offset_array[i] = (MPI_Aint) chunk_list[i].chunk_states.new_chunk.offset;
+ length_array[i] = (int) chunk_list[i].chunk_states.new_chunk.length;
write_buf_array[i] = (MPI_Aint) chunk_list[i].buf - (MPI_Aint) base_buf;
}
} /* end for */
@@ -3048,19 +2941,20 @@ done:
*/
static herr_t
H5D__filtered_collective_chunk_entry_io(H5D_filtered_collective_io_info_t *chunk_entry,
- const H5D_io_info_t *io_info, const H5D_type_info_t *type_info)
+ const H5D_io_info_t *io_info, const H5D_type_info_t *type_info, const H5D_chunk_map_t *fm)
{
- H5S_sel_iter_t *mem_iter = NULL; /* Memory iterator for H5D__scatter_mem/H5D__gather_mem */
- unsigned char *mod_data = NULL; /* Chunk modification data sent by a process to a chunk's owner */
- unsigned filter_mask = 0;
- hssize_t iter_nelmts; /* Number of points to iterate over for the chunk IO operation */
- hbool_t mem_iter_init = FALSE;
- size_t buf_size;
- size_t mod_data_alloced_bytes = 0;
- H5S_t *dataspace = NULL; /* Other process' dataspace for the chunk */
- void *tmp_gath_buf = NULL; /* Temporary gather buffer for owner of the chunk to gather into from
+ H5D_chunk_info_t *chunk_info = NULL;
+ H5S_sel_iter_t *mem_iter = NULL; /* Memory iterator for H5D__scatter_mem/H5D__gather_mem */
+ unsigned char *mod_data = NULL; /* Chunk modification data sent by a process to a chunk's owner */
+ unsigned filter_mask = 0;
+ hssize_t iter_nelmts; /* Number of points to iterate over for the chunk IO operation */
+ hbool_t mem_iter_init = FALSE;
+ size_t buf_size;
+ size_t mod_data_alloced_bytes = 0;
+ H5S_t *dataspace = NULL; /* Other process' dataspace for the chunk */
+ void *tmp_gath_buf = NULL; /* Temporary gather buffer for owner of the chunk to gather into from
application write buffer before scattering out to the chunk data buffer */
- herr_t ret_value = SUCCEED;
+ herr_t ret_value = SUCCEED;
FUNC_ENTER_STATIC
@@ -3068,24 +2962,28 @@ H5D__filtered_collective_chunk_entry_io(H5D_filtered_collective_io_info_t *chunk
HDassert(io_info);
HDassert(type_info);
+ /* Look up the chunk and get its file and memory dataspaces */
+ if (NULL == (chunk_info = (H5D_chunk_info_t *) H5SL_search(fm->sel_chunks, &chunk_entry->index)))
+ HGOTO_ERROR(H5E_DATASPACE, H5E_NOTFOUND, FAIL, "can't locate chunk in skip list")
+
/* If this is a read operation or a write operation where the chunk is not being fully
* overwritten, enough memory must be allocated to read the filtered chunk from the file.
* If this is a write operation where the chunk is being fully overwritten, enough memory
* must be allocated for the size of the unfiltered chunk.
*/
if (!chunk_entry->full_overwrite || io_info->op_type == H5D_IO_OP_READ) {
- buf_size = chunk_entry->old_chunk.length;
+ buf_size = chunk_entry->chunk_states.chunk_current.length;
}
else {
hssize_t extent_npoints;
- if ((extent_npoints = H5S_GET_EXTENT_NPOINTS(chunk_entry->chunk_info.fspace)) < 0)
+ if ((extent_npoints = H5S_GET_EXTENT_NPOINTS(chunk_info->fspace)) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_CANTCOUNT, FAIL, "dataspace is invalid")
buf_size = (hsize_t) extent_npoints * type_info->src_type_size;
}
- chunk_entry->new_chunk.length = buf_size;
+ chunk_entry->chunk_states.new_chunk.length = buf_size;
if (NULL == (chunk_entry->buf = H5MM_malloc(buf_size)))
HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate chunk data buffer")
@@ -3095,13 +2993,13 @@ H5D__filtered_collective_chunk_entry_io(H5D_filtered_collective_io_info_t *chunk
*/
if (!chunk_entry->full_overwrite || io_info->op_type == H5D_IO_OP_READ) {
/* XXX: Test with MPI types and collective read to improve performance */
- if (H5F_block_read(io_info->dset->oloc.file, H5FD_MEM_DRAW, chunk_entry->old_chunk.offset,
+ if (H5F_block_read(io_info->dset->oloc.file, H5FD_MEM_DRAW, chunk_entry->chunk_states.chunk_current.offset,
buf_size, H5AC_rawdata_dxpl_id, chunk_entry->buf) < 0)
HGOTO_ERROR(H5E_IO, H5E_READERROR, FAIL, "unable to read raw data chunk")
if (H5Z_pipeline(&io_info->dset->shared->dcpl_cache.pline, H5Z_FLAG_REVERSE, &filter_mask,
io_info->dxpl_cache->err_detect, io_info->dxpl_cache->filter_cb,
- (size_t *) &chunk_entry->new_chunk.length, &buf_size, &chunk_entry->buf) < 0)
+ (size_t *) &chunk_entry->chunk_states.new_chunk.length, &buf_size, &chunk_entry->buf) < 0)
HGOTO_ERROR(H5E_PLINE, H5E_CANTFILTER, FAIL, "couldn't unfilter chunk for modifying")
} /* end if */
@@ -3109,11 +3007,11 @@ H5D__filtered_collective_chunk_entry_io(H5D_filtered_collective_io_info_t *chunk
if (NULL == (mem_iter = (H5S_sel_iter_t *) H5MM_malloc(sizeof(*mem_iter))))
HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate memory iterator")
- if (H5S_select_iter_init(mem_iter, chunk_entry->chunk_info.mspace, type_info->src_type_size) < 0)
+ if (H5S_select_iter_init(mem_iter, chunk_info->mspace, type_info->src_type_size) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_CANTINIT, FAIL, "unable to initialize memory selection information")
mem_iter_init = TRUE;
- if ((iter_nelmts = H5S_GET_SELECT_NPOINTS(chunk_entry->chunk_info.mspace)) < 0)
+ if ((iter_nelmts = H5S_GET_SELECT_NPOINTS(chunk_info->mspace)) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_CANTCOUNT, FAIL, "dataspace is invalid")
/* If this is a read operation, scatter the read chunk data to the user's buffer.
@@ -3124,7 +3022,7 @@ H5D__filtered_collective_chunk_entry_io(H5D_filtered_collective_io_info_t *chunk
*/
switch (io_info->op_type) {
case H5D_IO_OP_READ:
- if (H5D__scatter_mem(chunk_entry->buf, chunk_entry->chunk_info.mspace, mem_iter,
+ if (H5D__scatter_mem(chunk_entry->buf, chunk_info->mspace, mem_iter,
(size_t) iter_nelmts, io_info->dxpl_cache, io_info->u.rbuf) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_READERROR, FAIL, "couldn't scatter to read buffer")
break;
@@ -3134,7 +3032,7 @@ H5D__filtered_collective_chunk_entry_io(H5D_filtered_collective_io_info_t *chunk
HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate temporary gather buffer")
/* Gather modification data from the application write buffer into a temporary buffer */
- if (!H5D__gather_mem(io_info->u.wbuf, chunk_entry->chunk_info.mspace, mem_iter,
+ if (!H5D__gather_mem(io_info->u.wbuf, chunk_info->mspace, mem_iter,
(size_t) iter_nelmts, io_info->dxpl_cache, tmp_gath_buf))
HGOTO_ERROR(H5E_IO, H5E_WRITEERROR, FAIL, "couldn't gather from write buffer")
@@ -3143,17 +3041,17 @@ H5D__filtered_collective_chunk_entry_io(H5D_filtered_collective_io_info_t *chunk
mem_iter_init = FALSE;
/* Initialize iterator for file selection */
- if (H5S_select_iter_init(mem_iter, chunk_entry->chunk_info.fspace, type_info->dst_type_size) < 0)
+ if (H5S_select_iter_init(mem_iter, chunk_info->fspace, type_info->dst_type_size) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_CANTINIT, FAIL, "unable to initialize file selection information")
mem_iter_init = TRUE;
- if ((iter_nelmts = H5S_GET_SELECT_NPOINTS(chunk_entry->chunk_info.fspace)) < 0)
+ if ((iter_nelmts = H5S_GET_SELECT_NPOINTS(chunk_info->fspace)) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_CANTCOUNT, FAIL, "dataspace is invalid")
/* Scatter the owner's modification data into the chunk data buffer according to
* the file space.
*/
- if (H5D__scatter_mem(tmp_gath_buf, chunk_entry->chunk_info.fspace, mem_iter,
+ if (H5D__scatter_mem(tmp_gath_buf, chunk_info->fspace, mem_iter,
(size_t) iter_nelmts, io_info->dxpl_cache, chunk_entry->buf) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_READERROR, FAIL, "couldn't scatter to chunk data buffer")
@@ -3169,8 +3067,8 @@ H5D__filtered_collective_chunk_entry_io(H5D_filtered_collective_io_info_t *chunk
int mpi_code;
/* Probe for the incoming message from another process */
- H5_CHECK_OVERFLOW(chunk_entry->chunk_info.index, hsize_t, int)
- if (MPI_SUCCESS != (mpi_code = MPI_Probe(MPI_ANY_SOURCE, (int) chunk_entry->chunk_info.index,
+ H5_CHECK_OVERFLOW(chunk_entry->index, hsize_t, int)
+ if (MPI_SUCCESS != (mpi_code = MPI_Probe(MPI_ANY_SOURCE, (int) chunk_entry->index,
io_info->comm, &status)))
HMPI_GOTO_ERROR(FAIL, "MPI_Probe failed", mpi_code)
@@ -3186,7 +3084,7 @@ H5D__filtered_collective_chunk_entry_io(H5D_filtered_collective_io_info_t *chunk
}
if (MPI_SUCCESS != (mpi_code = MPI_Recv(mod_data, count, MPI_BYTE, MPI_ANY_SOURCE,
- (int) chunk_entry->chunk_info.index, io_info->comm, &status)))
+ (int) chunk_entry->index, io_info->comm, &status)))
HMPI_GOTO_ERROR(FAIL, "MPI_Recv failed", mpi_code)
/* Decode the process' chunk file dataspace */
@@ -3221,12 +3119,12 @@ H5D__filtered_collective_chunk_entry_io(H5D_filtered_collective_io_info_t *chunk
/* Filter the chunk */
if (H5Z_pipeline(&io_info->dset->shared->dcpl_cache.pline, 0, &filter_mask,
io_info->dxpl_cache->err_detect, io_info->dxpl_cache->filter_cb,
- (size_t *) &chunk_entry->new_chunk.length, &buf_size, &chunk_entry->buf) < 0)
+ (size_t *) &chunk_entry->chunk_states.new_chunk.length, &buf_size, &chunk_entry->buf) < 0)
HGOTO_ERROR(H5E_PLINE, H5E_CANTFILTER, H5_ITER_ERROR, "output pipeline failed")
#if H5_SIZEOF_SIZE_T > 4
/* Check for the chunk expanding too much to encode in a 32-bit value */
- if (chunk_entry->new_chunk.length > ((size_t) 0xffffffff))
+ if (chunk_entry->chunk_states.new_chunk.length > ((size_t) 0xffffffff))
HGOTO_ERROR(H5E_DATASET, H5E_BADRANGE, FAIL, "chunk too large for 32-bit length")
#endif