summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--src/H5Dmpio.c179
1 files changed, 155 insertions, 24 deletions
diff --git a/src/H5Dmpio.c b/src/H5Dmpio.c
index e6fbb6a..622cdbb 100644
--- a/src/H5Dmpio.c
+++ b/src/H5Dmpio.c
@@ -103,7 +103,16 @@ typedef struct H5D_filtered_collective_io_info_t {
hbool_t full_overwrite; /* Whether or not this chunk is being fully overwritten */
size_t io_size; /* Size of the I/O to this chunk */
size_t num_writers; /* Total number of processes writing to this chunk */
+
+ struct {
+ int previous_owner;
+ int new_owner;
+ } owners;
+
+#if 0
int owner; /* Process which will be writing to this chunk */
+#endif
+
void *buf; /* Chunk data to be written to file/that has been read from file*/
} H5D_filtered_collective_io_info_t;
@@ -1353,8 +1362,9 @@ H5D__link_chunk_filtered_collective_io(H5D_io_info_t *io_info, const H5D_type_in
* then re-filtering the chunk.
*/
for (i = 0; i < chunk_list_num_entries; i++)
- if (H5D__filtered_collective_chunk_entry_io(&chunk_list[i], io_info, type_info) < 0)
- HGOTO_ERROR(H5E_DATASET, H5E_WRITEERROR, FAIL, "couldn't process chunk entry")
+ if (mpi_rank == chunk_list[i].owners.new_owner)
+ if (H5D__filtered_collective_chunk_entry_io(&chunk_list[i], io_info, type_info) < 0)
+ HGOTO_ERROR(H5E_DATASET, H5E_WRITEERROR, FAIL, "couldn't process chunk entry")
/* Gather the new chunk sizes to all processes for a collective reallocation
* of the chunks in the file.
@@ -1835,7 +1845,7 @@ H5D__multi_chunk_filtered_collective_io(H5D_io_info_t *io_info, const H5D_type_i
*/
for (i = 0; i < max_num_chunks; i++) {
/* Check if this process has a chunk to work on for this iteration */
- hbool_t have_chunk_to_process = i < chunk_list_num_entries;
+ hbool_t have_chunk_to_process = (i < chunk_list_num_entries) && (mpi_rank == chunk_list[i].owners.new_owner);
if (have_chunk_to_process)
if (H5D__filtered_collective_chunk_entry_io(&chunk_list[i], io_info, type_info) < 0)
@@ -2181,6 +2191,19 @@ H5D__cmp_filtered_collective_io_info_entry(const void *filtered_collective_io_in
FUNC_LEAVE_NOAPI(H5F_addr_cmp(addr1, addr2))
} /* end H5D__cmp_filtered_collective_io_info_entry() */
+static int
+H5D__cmp_filtered_collective_io_info_entry_owner(const void *filtered_collective_io_info_entry1, const void *filtered_collective_io_info_entry2)
+{
+ int owner1 = -1, owner2 = -1;
+
+ FUNC_ENTER_STATIC_NOERR
+
+ owner1 = ((const H5D_filtered_collective_io_info_t *) filtered_collective_io_info_entry1)->owners.previous_owner;
+ owner2 = ((const H5D_filtered_collective_io_info_t *) filtered_collective_io_info_entry2)->owners.previous_owner;
+
+ FUNC_LEAVE_NOAPI(owner1 - owner2)
+} /* end H5D__cmp_filtered_collective_io_info_entry_owner() */
+
/*-------------------------------------------------------------------------
* Function: H5D__sort_chunk
@@ -2565,14 +2588,13 @@ H5D__construct_filtered_io_info_list(const H5D_io_info_t *io_info, const H5D_typ
H5D_filtered_collective_io_info_t *shared_chunks_info_array = NULL; /* The list of all chunks selected in the operation by all processes */
H5S_sel_iter_t *mem_iter = NULL; /* Memory iterator for H5D__gather_mem */
unsigned char *mod_data = NULL; /* Chunk modification data sent by a process to a chunk's owner */
- H5SL_node_t *chunk_node;
MPI_Request *send_requests = NULL; /* Array of MPI_Isend chunk modification data send requests */
MPI_Status *send_statuses = NULL; /* Array of MPI_Isend chunk modification send statuses */
hbool_t mem_iter_init = FALSE;
- size_t num_send_requests = 0;
size_t num_chunks_selected;
- size_t shared_chunks_info_array_num_entries;
size_t i;
+ int *send_counts = NULL;
+ int *send_displacements = NULL;
int mpi_rank, mpi_size, mpi_code;
herr_t ret_value = SUCCEED;
@@ -2594,6 +2616,7 @@ H5D__construct_filtered_io_info_list(const H5D_io_info_t *io_info, const H5D_typ
if ((num_chunks_selected = H5SL_count(fm->sel_chunks))) {
H5D_chunk_info_t *chunk_info;
H5D_chunk_ud_t udata;
+ H5SL_node_t *chunk_node;
hssize_t select_npoints;
hssize_t chunk_npoints;
@@ -2611,7 +2634,7 @@ H5D__construct_filtered_io_info_list(const H5D_io_info_t *io_info, const H5D_typ
local_info_array[i].chunk_info = *chunk_info;
local_info_array[i].old_chunk = local_info_array[i].new_chunk = udata.chunk_block;
local_info_array[i].num_writers = 0;
- local_info_array[i].owner = mpi_rank;
+ local_info_array[i].owners.previous_owner = local_info_array[i].owners.new_owner = mpi_rank;
local_info_array[i].buf = NULL;
if ((select_npoints = H5S_GET_SELECT_NPOINTS(chunk_info->mspace)) < 0)
@@ -2629,6 +2652,9 @@ H5D__construct_filtered_io_info_list(const H5D_io_info_t *io_info, const H5D_typ
/* Redistribute shared chunks to new owners as necessary */
if (io_info->op_type == H5D_IO_OP_WRITE) {
+ size_t shared_chunks_info_array_num_entries = 0;
+ size_t num_send_requests = 0;
+
if (num_chunks_selected)
if (NULL == (send_requests = (MPI_Request *) H5MM_malloc(num_chunks_selected * sizeof(*send_requests))))
HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate send requests buffer")
@@ -2638,10 +2664,11 @@ H5D__construct_filtered_io_info_list(const H5D_io_info_t *io_info, const H5D_typ
if (H5D__mpio_array_gatherv(local_info_array, num_chunks_selected, sizeof(*local_info_array),
(void **) &shared_chunks_info_array, &shared_chunks_info_array_num_entries, mpi_size,
- true, 0, io_info->comm, H5D__cmp_filtered_collective_io_info_entry) < 0)
+ false, 0, io_info->comm, H5D__cmp_filtered_collective_io_info_entry) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_CANTGATHER, FAIL, "couldn't gather array")
- for (i = 0, num_chunks_selected = 0, num_send_requests = 0; i < shared_chunks_info_array_num_entries;) {
+#if 0
+ for (i = 0, num_chunks_selected = 0; i < shared_chunks_info_array_num_entries;) {
H5D_filtered_collective_io_info_t chunk_entry;
haddr_t chunk_addr = shared_chunks_info_array[i].old_chunk.offset;
size_t total_io_size = 0;
@@ -2760,26 +2787,124 @@ H5D__construct_filtered_io_info_list(const H5D_io_info_t *io_info, const H5D_typ
mem_iter_init = FALSE;
} /* end else */
} /* end for */
+#endif
/* Rank 0 redistributes any shared chunks to new owners as necessary */
if (mpi_rank == 0) {
+ if (NULL == (send_counts = (int *) H5MM_calloc((size_t) mpi_size * sizeof(*send_counts))))
+ HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "unable to allocate send counts buffer")
+
+ if (NULL == (send_displacements = (int *) H5MM_malloc((size_t) mpi_size * sizeof(*send_displacements))))
+ HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "unable to allocate send displacements buffer")
+
+ for (i = 0; i < shared_chunks_info_array_num_entries;) {
+ H5D_filtered_collective_io_info_t chunk_entry;
+ haddr_t last_seen_addr = shared_chunks_info_array[i].old_chunk.offset;
+ size_t set_begin_index = i;
+ size_t total_io_size = 0;
+ size_t max_io_size = 0;
+ size_t num_writers = 0;
+ int new_chunk_owner = 0;
+
+ /* Process each set of duplicate entries caused by another process writing to the same chunk */
+ do {
+ chunk_entry = shared_chunks_info_array[i];
- }
+ send_counts[chunk_entry.owners.previous_owner] += sizeof(chunk_entry);
- /* Release old list */
- if (local_info_array)
- H5MM_free(local_info_array);
+ /* Add this chunk entry's I/O size to the running total */
+ total_io_size += chunk_entry.io_size;
- /* Local info list becomes modified (redistributed) chunk list */
- local_info_array = shared_chunks_info_array;
+ /* The new owner of the chunk is determined by the process
+ * which is writing the most data to the chunk
+ */
+ if (chunk_entry.io_size > max_io_size) {
+ max_io_size = chunk_entry.io_size;
+ new_chunk_owner = chunk_entry.owners.previous_owner;
+ }
+
+ num_writers++;
+ } while (++i < shared_chunks_info_array_num_entries && shared_chunks_info_array[i].old_chunk.offset == last_seen_addr);
+
+ /* Set all of the chunk entries' "new_owner" fields */
+ for (; set_begin_index < i; set_begin_index++) {
+ shared_chunks_info_array[set_begin_index].owners.new_owner = new_chunk_owner;
+ shared_chunks_info_array[set_begin_index].num_writers = num_writers;
+ } /* end for */
+ } /* end for */
+
+ /* Sort the new list's in order of previous owner so that each original owner of a chunk
+ * entry gets that entry back, with the possibly newly-modified "new_owner" field
+ */
+ HDqsort(shared_chunks_info_array, shared_chunks_info_array_num_entries,
+ sizeof(*shared_chunks_info_array), H5D__cmp_filtered_collective_io_info_entry_owner);
+
+ send_displacements[0] = 0;
+ for (i = 1; i < (size_t) mpi_size; i++)
+ send_displacements[i] = send_displacements[i - 1] + send_counts[i - 1];
+ } /* end if */
+
+ /* Scatter the segments of the list back to each process */
+ if (MPI_SUCCESS != (mpi_code = MPI_Scatterv(shared_chunks_info_array, send_counts,
+ send_displacements, MPI_BYTE, local_info_array, num_chunks_selected * sizeof(*local_info_array),
+ MPI_BYTE, 0, io_info->comm)))
+ HMPI_GOTO_ERROR(FAIL, "unable to scatter shared chunks info buffer", mpi_code)
/* Now that the chunks have been redistributed, each process must send its modification data
* to the new owners of any of the chunks it previously possessed
*/
for (i = 0; i < num_chunks_selected; i++) {
- if (mpi_rank != local_info_array[i].owner) {
+ if (mpi_rank != local_info_array[i].owners.new_owner) {
+ H5D_filtered_collective_io_info_t chunk_entry = local_info_array[i];
+ unsigned char *mod_data_p = NULL;
+ hssize_t iter_nelmts;
+ size_t mod_data_size;
+
+ /* Determine size of serialized chunk file dataspace, plus the size of
+ * the data being written
+ */
+ if (H5S_encode(chunk_entry.chunk_info.fspace, &mod_data, &mod_data_size) < 0)
+ HGOTO_ERROR(H5E_DATASET, H5E_CANTENCODE, FAIL, "unable to get encoded dataspace size")
- }
+ if ((iter_nelmts = H5S_GET_SELECT_NPOINTS(chunk_entry.chunk_info.mspace)) < 0)
+ HGOTO_ERROR(H5E_DATASET, H5E_CANTCOUNT, FAIL, "dataspace is invalid")
+
+ mod_data_size += (size_t) iter_nelmts * type_info->src_type_size;
+
+ if (NULL == (mod_data = (unsigned char *) H5MM_malloc(mod_data_size)))
+ HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate chunk modification send buffer")
+
+ /* Serialize the chunk's file dataspace into the buffer */
+ mod_data_p = mod_data;
+ if (H5S_encode(chunk_entry.chunk_info.fspace, &mod_data_p, &mod_data_size) < 0)
+ HGOTO_ERROR(H5E_DATASET, H5E_CANTENCODE, FAIL, "unable to encode dataspace")
+
+ /* Intialize iterator for memory selection */
+ if (H5S_select_iter_init(mem_iter, chunk_entry.chunk_info.mspace, type_info->src_type_size) < 0)
+ HGOTO_ERROR(H5E_DATASET, H5E_CANTINIT, FAIL, "unable to initialize memory selection information")
+ mem_iter_init = TRUE;
+
+ /* Collect the modification data into the buffer */
+ if (!H5D__gather_mem(io_info->u.wbuf, chunk_entry.chunk_info.mspace, mem_iter,
+ (size_t) iter_nelmts, io_info->dxpl_cache, mod_data_p))
+ HGOTO_ERROR(H5E_IO, H5E_CANTGATHER, FAIL, "couldn't gather from write buffer")
+
+ /* Send modification data to new owner */
+ H5_CHECK_OVERFLOW(mod_data_size, size_t, int)
+ H5_CHECK_OVERFLOW(chunk_entry.chunk_info.index, hsize_t, int)
+ if (MPI_SUCCESS != (mpi_code = MPI_Isend(mod_data, (int) mod_data_size, MPI_BYTE, chunk_entry.owners.new_owner,
+ (int) chunk_entry.chunk_info.index, io_info->comm, &send_requests[num_send_requests++])))
+ HMPI_GOTO_ERROR(FAIL, "MPI_Isend failed", mpi_code)
+
+ if (mem_iter_init && H5S_SELECT_ITER_RELEASE(mem_iter) < 0)
+ HGOTO_ERROR(H5E_DATASET, H5E_CANTFREE, FAIL, "couldn't release memory selection iterator")
+ mem_iter_init = FALSE;
+
+ if (mod_data) {
+ H5MM_free(mod_data);
+ mod_data = NULL;
+ } /* end if */
+ } /* end if */
} /* end for */
/* Wait for all async send requests to complete before returning */
@@ -2790,13 +2915,17 @@ H5D__construct_filtered_io_info_list(const H5D_io_info_t *io_info, const H5D_typ
H5_CHECK_OVERFLOW(num_send_requests, size_t, int);
if (MPI_SUCCESS != (mpi_code = MPI_Waitall((int) num_send_requests, send_requests, send_statuses)))
HMPI_GOTO_ERROR(FAIL, "MPI_Waitall failed", mpi_code)
- }
+ } /* end if */
} /* end if */
*chunk_list = local_info_array;
*num_entries = num_chunks_selected;
done:
+ if (send_counts)
+ H5MM_free(send_counts);
+ if (send_displacements)
+ H5MM_free(send_displacements);
if (send_requests)
H5MM_free(send_requests);
if (send_statuses)
@@ -2865,12 +2994,14 @@ H5D__mpio_filtered_collective_write_type(H5D_filtered_collective_io_info_t *chun
base_buf = chunk_list[0].buf;
for (i = 0; i < num_entries; i++) {
- /* Set up the offset in the file, the length of the chunk data, and the relative
- * displacement of the chunk data write buffer
- */
- file_offset_array[i] = (MPI_Aint) chunk_list[i].new_chunk.offset;
- length_array[i] = (int) chunk_list[i].new_chunk.length;
- write_buf_array[i] = (MPI_Aint) chunk_list[i].buf - (MPI_Aint) base_buf;
+ if (chunk_list[i].owners.previous_owner == chunk_list[i].owners.new_owner) {
+ /* Set up the offset in the file, the length of the chunk data, and the relative
+ * displacement of the chunk data write buffer
+ */
+ file_offset_array[i] = (MPI_Aint) chunk_list[i].new_chunk.offset;
+ length_array[i] = (int) chunk_list[i].new_chunk.length;
+ write_buf_array[i] = (MPI_Aint) chunk_list[i].buf - (MPI_Aint) base_buf;
+ }
} /* end for */
/* Create memory MPI type */