diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/H5Dmpio.c | 197 |
1 files changed, 158 insertions, 39 deletions
diff --git a/src/H5Dmpio.c b/src/H5Dmpio.c index 5b415c5..4f839a8 100644 --- a/src/H5Dmpio.c +++ b/src/H5Dmpio.c @@ -93,29 +93,111 @@ typedef struct H5D_chunk_addr_info_t { H5D_chunk_info_t chunk_info; } H5D_chunk_addr_info_t; -/* Information about a single chunk when performing collective filtered I/O */ +/* + * Information about a single chunk when performing collective filtered I/O. All + * of the fields of one of these structs are initialized at the start of collective + * filtered I/O in the function H5D__construct_filtered_io_info_list(). + * + * This struct's fields are as follows: + * + * index - The "Index" of the chunk in the dataset. The index of a chunk is used during + * the collective re-insertion of chunks into the chunk index after the collective + * I/O has been performed. + * + * scaled - The scaled coordinates of the chunk in the dataset's file dataspace. The + * coordinates are used in both the collective re-allocation of space in the file + * and the collective re-insertion of chunks into the chunk index after the collective + * I/O has been performed. + * + * full_overwrite - A flag which determines whether or not a chunk needs to be read from the + * file when being updated. If a chunk is being fully overwritten (the entire + * extent is selected in its file dataspace), then it is not necessary to + * read the chunk from the file. However, if the chunk is not being fully + * overwritten, it has to be read from the file in order to update the chunk + * without trashing the parts of the chunk that are not selected. + * + * num_writers - The total number of processors writing to this chunk. This field is used + * when the new owner of a chunk is receiving messages, which contain selections in + * the chunk and data to update the chunk with, from other processors which have this + * chunk selected in the I/O operation. The new owner must know how many processors it + * should expect messages from so that it can post an equal number of receive calls. + * + * io_size - The total size of I/O to this chunk. This field is an accumulation of the size of + * I/O to the chunk from each processor which has the chunk selected and is used to + * determine the value for the previous full_overwrite flag. + * + * buf - A pointer which serves the dual purpose of holding either the chunk data which is to be + * written to the file or the chunk data which has been read from the file. + * + * chunk_states - In the case of dataset writes only, this struct is used to track a chunk's size and + * address in the file before and after the filtering operation has occurred. + * + * Its fields are as follows: + * + * chunk_current - The address in the file and size of this chunk before the filtering + * operation. When reading a chunk from the file, this field is used to + * read the correct amount of bytes. It is also used when redistributing + * shared chunks among processors and as a parameter to the chunk file + * space reallocation function. + * + * new_chunk - The address in the file and size of this chunk after the filtering + * operation. This field is relevant when collectively re-allocating space + * in the file for all of the chunks written to in the I/O operation, as + * their sizes may have changed after their data has been filtered. + * + * owners - In the case of dataset writes only, this struct is used to manage which single processor + * will ultimately write data out to the chunk. It allows the other processors to act according + * to the decision and send their selection in the chunk, as well as the data they wish + * to update the chunk with, to the processor which is writing to the chunk. + * + * Its fields are as follows: + * + * original_owner - The processor which originally had this chunk selected at the beginning of + * the collective filtered I/O operation. This field is currently used when + * redistributing shared chunks among processors. + * + * new_owner - The processor which has been selected to perform the write to this chunk. + * + * async_info - In the case of dataset writes only, this struct is used by the owning processor of the + * chunk in order to manage the MPI send and receive calls made between it and all of + * the other processors which have this chunk selected in the I/O operation. + * + * Its fields are as follows: + * + * receive_requests_array - An array containing one MPI_Request for each of the + * asynchronous MPI receive calls the owning processor of this + * chunk makes to another processor in order to receive that + * processor's chunk modification data and selection in the chunk. + * + * receive_buffer_array - An array of buffers into which the owning processor of this chunk + * will store chunk modification data and the selection in the chunk + * received from another processor. + * + * num_receive_requests - The number of entries in the receive_request_array and + * receive_buffer_array fields. + */ typedef struct H5D_filtered_collective_io_info_t { - hsize_t index; /* "Index" of chunk in dataset */ - hsize_t scaled[H5O_LAYOUT_NDIMS]; /* Scaled coordinates of chunk (in file dataset's dataspace) */ - hbool_t full_overwrite; /* Whether or not this chunk is being fully overwritten */ - size_t num_writers; /* Total number of processes writing to this chunk */ - size_t io_size; /* Size of the I/O to this chunk */ - void *buf; /* Chunk data to be written to file/that has been read from file*/ + hsize_t index; + hsize_t scaled[H5O_LAYOUT_NDIMS]; + hbool_t full_overwrite; + size_t num_writers; + size_t io_size; + void *buf; struct { - H5F_block_t chunk_current; /* The address in the file and size of this chunk before the I/O and filtering operations */ - H5F_block_t new_chunk; /* The address in the file and size of this chunk after the I/O and filtering operations */ + H5F_block_t chunk_current; + H5F_block_t new_chunk; } chunk_states; struct { - int original_owner; /* The process which originally had this chunk selected in the I/O operation */ - int new_owner; /* The process which the chunk has been re-assigned to */ + int original_owner; + int new_owner; } owners; struct { - MPI_Request *receive_requests_array; /* Array of receive requests posted to receive chunk modification data from other processes */ - unsigned char **receive_buffer_array; /* Array of receive buffers to store chunk modification data from other processes */ - int num_receive_requests; /* Number of entries in the receive_requests_array and receive_buffer_array arrays */ + MPI_Request *receive_requests_array; + unsigned char **receive_buffer_array; + int num_receive_requests; } async_info; } H5D_filtered_collective_io_info_t; @@ -346,16 +428,26 @@ done: /*------------------------------------------------------------------------- * Function: H5D__mpio_array_gatherv * - * Purpose: Given arrays by MPI ranks, gathers them into a single array - * which is either gathered to the rank specified by root when - * allgather is false, or is distributed back to all ranks - * when allgather is true. The number of processes - * participating in the gather operation should be specified - * for nprocs and the MPI communicator to use should be - * specified for comm. If the sort_func argument is - * specified, the list is sorted before being returned. + * Purpose: Given an array, specified in local_array, by each processor + * calling this function, gathers each array into a single + * array which is then either gathered to the processor + * specified by root, when allgather is false, or is + * distributed back to all processors when allgather is true. + * + * The size of each entry and number of entries in the array + * contributed by an individual processor should be specified + * in array_entry_size and local_array_num_entries, + * respectively. * - * If allgather is specified as true, root is ignored. + * The number of processors participating in the gather + * operation should be specified for nprocs. + * + * The MPI communicator to use should be specified for comm. + * + * If the sort_func argument is supplied, the array is sorted + * before the function returns. + * + * Note: if allgather is specified as true, root is ignored. * * Return: Non-negative on success/Negative on failure * @@ -371,9 +463,9 @@ H5D__mpio_array_gatherv(void *local_array, size_t local_array_num_entries, { size_t gathered_array_num_entries = 0; /* The size of the newly-constructed array */ size_t i; - void *gathered_array = NULL; /* The newly-constructed array returned to the caller */ - int *receive_counts_array = NULL; /* Array containing number of entries each process is contributing */ - int *displacements_array = NULL; /* Array of displacements where each process places its data in the final array */ + void *gathered_array = NULL; /* The newly-constructed array returned to the caller */ + int *receive_counts_array = NULL; /* Array containing number of entries each process is contributing */ + int *displacements_array = NULL; /* Array of displacements where each process places its data in the final array */ int mpi_code; int sendcount; herr_t ret_value = SUCCEED; @@ -836,17 +928,17 @@ H5D__chunk_collective_io(H5D_io_info_t *io_info, const H5D_type_info_t *type_inf if (io_info->op_type == H5D_IO_OP_READ) { if(H5D__multi_chunk_filtered_collective_io(io_info, type_info, fm, dx_plist) < 0) HGOTO_ERROR(H5E_IO, H5E_CANTGET, FAIL, "couldn't finish optimized multiple filtered chunk MPI-IO") - } + } /* end if */ else { if(H5D__link_chunk_filtered_collective_io(io_info, type_info, fm, dx_plist) < 0) HGOTO_ERROR(H5E_IO, H5E_CANTGET, FAIL, "couldn't finish filtered linked chunk MPI-IO") - } - } + } /* end else */ + } /* end if */ else { /* Perform unfiltered link chunk collective IO */ if(H5D__link_chunk_collective_io(io_info, type_info, fm, sum_chunk, dx_plist) < 0) HGOTO_ERROR(H5E_IO, H5E_CANTGET, FAIL, "couldn't finish linked chunk MPI-IO") - } + } /* end else */ break; case H5D_MULTI_CHUNK_IO: /* direct request to do multi-chunk IO */ @@ -855,12 +947,12 @@ H5D__chunk_collective_io(H5D_io_info_t *io_info, const H5D_type_info_t *type_inf if(io_info->dset->shared->dcpl_cache.pline.nused > 0) { if(H5D__multi_chunk_filtered_collective_io(io_info, type_info, fm, dx_plist) < 0) HGOTO_ERROR(H5E_IO, H5E_CANTGET, FAIL, "couldn't finish optimized multiple filtered chunk MPI-IO") - } + } /* end if */ else { /* Perform unfiltered multi chunk collective IO */ if(H5D__multi_chunk_collective_io(io_info, type_info, fm, dx_plist) < 0) HGOTO_ERROR(H5E_IO, H5E_CANTGET, FAIL, "couldn't finish optimized multiple chunk MPI-IO") - } + } /* end else */ break; } /* end switch */ @@ -1703,8 +1795,12 @@ done: /*------------------------------------------------------------------------- * Function: H5D__multi_chunk_filtered_collective_io * - * Purpose: To do filtered collective IO per chunk to save on memory, - * as opposed to collective IO of every chunk at once + * Purpose: To do filtered collective IO iteratively to save on memory. + * While link_chunk_filtered_collective_io will construct and + * work on a list of all of the chunks selected in the IO + * operation at once, this function works iteratively on a set + * of chunks at a time; at most one chunk per rank per + * iteration. * * 1. Construct a list of selected chunks in the collective IO * operation @@ -2716,11 +2812,28 @@ done: * to preserve file integrity after the write by ensuring * that any shared chunks are only modified by one process. * - * The current implementation simply hands the list off to - * rank 0, which then scans the list and for each shared - * chunk, it redistributes the chunk to the process writing - * to the chunk which currently has the least amount of chunks - * assigned to it. + * The current implementation follows this 3-phase process: + * + * - Collect everyone's list of chunks into one large list, + * sort the list in increasing order of chunk offset in the + * file and hand the list off to rank 0 + * + * - Rank 0 scans the list looking for matching runs of chunk + * offset in the file (corresponding to a shared chunk which + * has been selected by more than one rank in the I/O + * operation) and for each shared chunk, it redistributes + * the chunk to the process writing to the chunk which + * currently has the least amount of chunks assigned to it + * by modifying the "new_owner" field in each of the list + * entries corresponding to that chunk + * + * - After the chunks have been redistributed, rank 0 re-sorts + * the list in order of previous owner so that each rank + * will get back exactly the array that they contributed to + * the redistribution operation, with the "new_owner" field + * of each chunk they are modifying having possibly been + * modified. Rank 0 then scatters each segment of the list + * back to its corresponding rank * * Return: Non-negative on success/Negative on failure * @@ -2768,6 +2881,9 @@ H5D__chunk_redistribute_shared_chunks(const H5D_io_info_t *io_info, const H5D_ty if (NULL == (mem_iter = (H5S_sel_iter_t *) H5MM_malloc(sizeof(H5S_sel_iter_t)))) HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate memory iterator") + /* Gather every rank's list of chunks to rank 0 to allow it to perform the redistribution operation. After this + * call, the gathered list will initially be sorted in increasing order of chunk offset in the file. + */ if (H5D__mpio_array_gatherv(local_chunk_array, *local_chunk_array_num_entries, sizeof(*local_chunk_array), (void **) &shared_chunks_info_array, &shared_chunks_info_array_num_entries, mpi_size, false, 0, io_info->comm, H5D__cmp_filtered_collective_io_info_entry) < 0) @@ -3215,6 +3331,9 @@ H5D__filtered_collective_chunk_entry_io(H5D_filtered_collective_io_info_t *chunk chunk_entry->async_info.receive_requests_array, MPI_STATUSES_IGNORE))) HMPI_GOTO_ERROR(FAIL, "MPI_Waitall failed", mpi_code) + /* For each asynchronous receive call previously posted, receive the chunk modification + * buffer from another rank and update the chunk data + */ for (i = 0; i < (size_t) chunk_entry->async_info.num_receive_requests; i++) { const unsigned char *mod_data_p; |