diff options
-rw-r--r-- | src/H5Dmpio.c | 179 |
1 files changed, 155 insertions, 24 deletions
diff --git a/src/H5Dmpio.c b/src/H5Dmpio.c index e6fbb6a..622cdbb 100644 --- a/src/H5Dmpio.c +++ b/src/H5Dmpio.c @@ -103,7 +103,16 @@ typedef struct H5D_filtered_collective_io_info_t { hbool_t full_overwrite; /* Whether or not this chunk is being fully overwritten */ size_t io_size; /* Size of the I/O to this chunk */ size_t num_writers; /* Total number of processes writing to this chunk */ + + struct { + int previous_owner; + int new_owner; + } owners; + +#if 0 int owner; /* Process which will be writing to this chunk */ +#endif + void *buf; /* Chunk data to be written to file/that has been read from file*/ } H5D_filtered_collective_io_info_t; @@ -1353,8 +1362,9 @@ H5D__link_chunk_filtered_collective_io(H5D_io_info_t *io_info, const H5D_type_in * then re-filtering the chunk. */ for (i = 0; i < chunk_list_num_entries; i++) - if (H5D__filtered_collective_chunk_entry_io(&chunk_list[i], io_info, type_info) < 0) - HGOTO_ERROR(H5E_DATASET, H5E_WRITEERROR, FAIL, "couldn't process chunk entry") + if (mpi_rank == chunk_list[i].owners.new_owner) + if (H5D__filtered_collective_chunk_entry_io(&chunk_list[i], io_info, type_info) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_WRITEERROR, FAIL, "couldn't process chunk entry") /* Gather the new chunk sizes to all processes for a collective reallocation * of the chunks in the file. @@ -1835,7 +1845,7 @@ H5D__multi_chunk_filtered_collective_io(H5D_io_info_t *io_info, const H5D_type_i */ for (i = 0; i < max_num_chunks; i++) { /* Check if this process has a chunk to work on for this iteration */ - hbool_t have_chunk_to_process = i < chunk_list_num_entries; + hbool_t have_chunk_to_process = (i < chunk_list_num_entries) && (mpi_rank == chunk_list[i].owners.new_owner); if (have_chunk_to_process) if (H5D__filtered_collective_chunk_entry_io(&chunk_list[i], io_info, type_info) < 0) @@ -2181,6 +2191,19 @@ H5D__cmp_filtered_collective_io_info_entry(const void *filtered_collective_io_in FUNC_LEAVE_NOAPI(H5F_addr_cmp(addr1, addr2)) } /* end H5D__cmp_filtered_collective_io_info_entry() */ +static int +H5D__cmp_filtered_collective_io_info_entry_owner(const void *filtered_collective_io_info_entry1, const void *filtered_collective_io_info_entry2) +{ + int owner1 = -1, owner2 = -1; + + FUNC_ENTER_STATIC_NOERR + + owner1 = ((const H5D_filtered_collective_io_info_t *) filtered_collective_io_info_entry1)->owners.previous_owner; + owner2 = ((const H5D_filtered_collective_io_info_t *) filtered_collective_io_info_entry2)->owners.previous_owner; + + FUNC_LEAVE_NOAPI(owner1 - owner2) +} /* end H5D__cmp_filtered_collective_io_info_entry_owner() */ + /*------------------------------------------------------------------------- * Function: H5D__sort_chunk @@ -2565,14 +2588,13 @@ H5D__construct_filtered_io_info_list(const H5D_io_info_t *io_info, const H5D_typ H5D_filtered_collective_io_info_t *shared_chunks_info_array = NULL; /* The list of all chunks selected in the operation by all processes */ H5S_sel_iter_t *mem_iter = NULL; /* Memory iterator for H5D__gather_mem */ unsigned char *mod_data = NULL; /* Chunk modification data sent by a process to a chunk's owner */ - H5SL_node_t *chunk_node; MPI_Request *send_requests = NULL; /* Array of MPI_Isend chunk modification data send requests */ MPI_Status *send_statuses = NULL; /* Array of MPI_Isend chunk modification send statuses */ hbool_t mem_iter_init = FALSE; - size_t num_send_requests = 0; size_t num_chunks_selected; - size_t shared_chunks_info_array_num_entries; size_t i; + int *send_counts = NULL; + int *send_displacements = NULL; int mpi_rank, mpi_size, mpi_code; herr_t ret_value = SUCCEED; @@ -2594,6 +2616,7 @@ H5D__construct_filtered_io_info_list(const H5D_io_info_t *io_info, const H5D_typ if ((num_chunks_selected = H5SL_count(fm->sel_chunks))) { H5D_chunk_info_t *chunk_info; H5D_chunk_ud_t udata; + H5SL_node_t *chunk_node; hssize_t select_npoints; hssize_t chunk_npoints; @@ -2611,7 +2634,7 @@ H5D__construct_filtered_io_info_list(const H5D_io_info_t *io_info, const H5D_typ local_info_array[i].chunk_info = *chunk_info; local_info_array[i].old_chunk = local_info_array[i].new_chunk = udata.chunk_block; local_info_array[i].num_writers = 0; - local_info_array[i].owner = mpi_rank; + local_info_array[i].owners.previous_owner = local_info_array[i].owners.new_owner = mpi_rank; local_info_array[i].buf = NULL; if ((select_npoints = H5S_GET_SELECT_NPOINTS(chunk_info->mspace)) < 0) @@ -2629,6 +2652,9 @@ H5D__construct_filtered_io_info_list(const H5D_io_info_t *io_info, const H5D_typ /* Redistribute shared chunks to new owners as necessary */ if (io_info->op_type == H5D_IO_OP_WRITE) { + size_t shared_chunks_info_array_num_entries = 0; + size_t num_send_requests = 0; + if (num_chunks_selected) if (NULL == (send_requests = (MPI_Request *) H5MM_malloc(num_chunks_selected * sizeof(*send_requests)))) HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate send requests buffer") @@ -2638,10 +2664,11 @@ H5D__construct_filtered_io_info_list(const H5D_io_info_t *io_info, const H5D_typ if (H5D__mpio_array_gatherv(local_info_array, num_chunks_selected, sizeof(*local_info_array), (void **) &shared_chunks_info_array, &shared_chunks_info_array_num_entries, mpi_size, - true, 0, io_info->comm, H5D__cmp_filtered_collective_io_info_entry) < 0) + false, 0, io_info->comm, H5D__cmp_filtered_collective_io_info_entry) < 0) HGOTO_ERROR(H5E_DATASET, H5E_CANTGATHER, FAIL, "couldn't gather array") - for (i = 0, num_chunks_selected = 0, num_send_requests = 0; i < shared_chunks_info_array_num_entries;) { +#if 0 + for (i = 0, num_chunks_selected = 0; i < shared_chunks_info_array_num_entries;) { H5D_filtered_collective_io_info_t chunk_entry; haddr_t chunk_addr = shared_chunks_info_array[i].old_chunk.offset; size_t total_io_size = 0; @@ -2760,26 +2787,124 @@ H5D__construct_filtered_io_info_list(const H5D_io_info_t *io_info, const H5D_typ mem_iter_init = FALSE; } /* end else */ } /* end for */ +#endif /* Rank 0 redistributes any shared chunks to new owners as necessary */ if (mpi_rank == 0) { + if (NULL == (send_counts = (int *) H5MM_calloc((size_t) mpi_size * sizeof(*send_counts)))) + HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "unable to allocate send counts buffer") + + if (NULL == (send_displacements = (int *) H5MM_malloc((size_t) mpi_size * sizeof(*send_displacements)))) + HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "unable to allocate send displacements buffer") + + for (i = 0; i < shared_chunks_info_array_num_entries;) { + H5D_filtered_collective_io_info_t chunk_entry; + haddr_t last_seen_addr = shared_chunks_info_array[i].old_chunk.offset; + size_t set_begin_index = i; + size_t total_io_size = 0; + size_t max_io_size = 0; + size_t num_writers = 0; + int new_chunk_owner = 0; + + /* Process each set of duplicate entries caused by another process writing to the same chunk */ + do { + chunk_entry = shared_chunks_info_array[i]; - } + send_counts[chunk_entry.owners.previous_owner] += sizeof(chunk_entry); - /* Release old list */ - if (local_info_array) - H5MM_free(local_info_array); + /* Add this chunk entry's I/O size to the running total */ + total_io_size += chunk_entry.io_size; - /* Local info list becomes modified (redistributed) chunk list */ - local_info_array = shared_chunks_info_array; + /* The new owner of the chunk is determined by the process + * which is writing the most data to the chunk + */ + if (chunk_entry.io_size > max_io_size) { + max_io_size = chunk_entry.io_size; + new_chunk_owner = chunk_entry.owners.previous_owner; + } + + num_writers++; + } while (++i < shared_chunks_info_array_num_entries && shared_chunks_info_array[i].old_chunk.offset == last_seen_addr); + + /* Set all of the chunk entries' "new_owner" fields */ + for (; set_begin_index < i; set_begin_index++) { + shared_chunks_info_array[set_begin_index].owners.new_owner = new_chunk_owner; + shared_chunks_info_array[set_begin_index].num_writers = num_writers; + } /* end for */ + } /* end for */ + + /* Sort the new list's in order of previous owner so that each original owner of a chunk + * entry gets that entry back, with the possibly newly-modified "new_owner" field + */ + HDqsort(shared_chunks_info_array, shared_chunks_info_array_num_entries, + sizeof(*shared_chunks_info_array), H5D__cmp_filtered_collective_io_info_entry_owner); + + send_displacements[0] = 0; + for (i = 1; i < (size_t) mpi_size; i++) + send_displacements[i] = send_displacements[i - 1] + send_counts[i - 1]; + } /* end if */ + + /* Scatter the segments of the list back to each process */ + if (MPI_SUCCESS != (mpi_code = MPI_Scatterv(shared_chunks_info_array, send_counts, + send_displacements, MPI_BYTE, local_info_array, num_chunks_selected * sizeof(*local_info_array), + MPI_BYTE, 0, io_info->comm))) + HMPI_GOTO_ERROR(FAIL, "unable to scatter shared chunks info buffer", mpi_code) /* Now that the chunks have been redistributed, each process must send its modification data * to the new owners of any of the chunks it previously possessed */ for (i = 0; i < num_chunks_selected; i++) { - if (mpi_rank != local_info_array[i].owner) { + if (mpi_rank != local_info_array[i].owners.new_owner) { + H5D_filtered_collective_io_info_t chunk_entry = local_info_array[i]; + unsigned char *mod_data_p = NULL; + hssize_t iter_nelmts; + size_t mod_data_size; + + /* Determine size of serialized chunk file dataspace, plus the size of + * the data being written + */ + if (H5S_encode(chunk_entry.chunk_info.fspace, &mod_data, &mod_data_size) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_CANTENCODE, FAIL, "unable to get encoded dataspace size") - } + if ((iter_nelmts = H5S_GET_SELECT_NPOINTS(chunk_entry.chunk_info.mspace)) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_CANTCOUNT, FAIL, "dataspace is invalid") + + mod_data_size += (size_t) iter_nelmts * type_info->src_type_size; + + if (NULL == (mod_data = (unsigned char *) H5MM_malloc(mod_data_size))) + HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate chunk modification send buffer") + + /* Serialize the chunk's file dataspace into the buffer */ + mod_data_p = mod_data; + if (H5S_encode(chunk_entry.chunk_info.fspace, &mod_data_p, &mod_data_size) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_CANTENCODE, FAIL, "unable to encode dataspace") + + /* Intialize iterator for memory selection */ + if (H5S_select_iter_init(mem_iter, chunk_entry.chunk_info.mspace, type_info->src_type_size) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_CANTINIT, FAIL, "unable to initialize memory selection information") + mem_iter_init = TRUE; + + /* Collect the modification data into the buffer */ + if (!H5D__gather_mem(io_info->u.wbuf, chunk_entry.chunk_info.mspace, mem_iter, + (size_t) iter_nelmts, io_info->dxpl_cache, mod_data_p)) + HGOTO_ERROR(H5E_IO, H5E_CANTGATHER, FAIL, "couldn't gather from write buffer") + + /* Send modification data to new owner */ + H5_CHECK_OVERFLOW(mod_data_size, size_t, int) + H5_CHECK_OVERFLOW(chunk_entry.chunk_info.index, hsize_t, int) + if (MPI_SUCCESS != (mpi_code = MPI_Isend(mod_data, (int) mod_data_size, MPI_BYTE, chunk_entry.owners.new_owner, + (int) chunk_entry.chunk_info.index, io_info->comm, &send_requests[num_send_requests++]))) + HMPI_GOTO_ERROR(FAIL, "MPI_Isend failed", mpi_code) + + if (mem_iter_init && H5S_SELECT_ITER_RELEASE(mem_iter) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_CANTFREE, FAIL, "couldn't release memory selection iterator") + mem_iter_init = FALSE; + + if (mod_data) { + H5MM_free(mod_data); + mod_data = NULL; + } /* end if */ + } /* end if */ } /* end for */ /* Wait for all async send requests to complete before returning */ @@ -2790,13 +2915,17 @@ H5D__construct_filtered_io_info_list(const H5D_io_info_t *io_info, const H5D_typ H5_CHECK_OVERFLOW(num_send_requests, size_t, int); if (MPI_SUCCESS != (mpi_code = MPI_Waitall((int) num_send_requests, send_requests, send_statuses))) HMPI_GOTO_ERROR(FAIL, "MPI_Waitall failed", mpi_code) - } + } /* end if */ } /* end if */ *chunk_list = local_info_array; *num_entries = num_chunks_selected; done: + if (send_counts) + H5MM_free(send_counts); + if (send_displacements) + H5MM_free(send_displacements); if (send_requests) H5MM_free(send_requests); if (send_statuses) @@ -2865,12 +2994,14 @@ H5D__mpio_filtered_collective_write_type(H5D_filtered_collective_io_info_t *chun base_buf = chunk_list[0].buf; for (i = 0; i < num_entries; i++) { - /* Set up the offset in the file, the length of the chunk data, and the relative - * displacement of the chunk data write buffer - */ - file_offset_array[i] = (MPI_Aint) chunk_list[i].new_chunk.offset; - length_array[i] = (int) chunk_list[i].new_chunk.length; - write_buf_array[i] = (MPI_Aint) chunk_list[i].buf - (MPI_Aint) base_buf; + if (chunk_list[i].owners.previous_owner == chunk_list[i].owners.new_owner) { + /* Set up the offset in the file, the length of the chunk data, and the relative + * displacement of the chunk data write buffer + */ + file_offset_array[i] = (MPI_Aint) chunk_list[i].new_chunk.offset; + length_array[i] = (int) chunk_list[i].new_chunk.length; + write_buf_array[i] = (MPI_Aint) chunk_list[i].buf - (MPI_Aint) base_buf; + } } /* end for */ /* Create memory MPI type */ |