summaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/H5Dmpio.c197
1 files changed, 158 insertions, 39 deletions
diff --git a/src/H5Dmpio.c b/src/H5Dmpio.c
index 5b415c5..4f839a8 100644
--- a/src/H5Dmpio.c
+++ b/src/H5Dmpio.c
@@ -93,29 +93,111 @@ typedef struct H5D_chunk_addr_info_t {
H5D_chunk_info_t chunk_info;
} H5D_chunk_addr_info_t;
-/* Information about a single chunk when performing collective filtered I/O */
+/*
+ * Information about a single chunk when performing collective filtered I/O. All
+ * of the fields of one of these structs are initialized at the start of collective
+ * filtered I/O in the function H5D__construct_filtered_io_info_list().
+ *
+ * This struct's fields are as follows:
+ *
+ * index - The "Index" of the chunk in the dataset. The index of a chunk is used during
+ * the collective re-insertion of chunks into the chunk index after the collective
+ * I/O has been performed.
+ *
+ * scaled - The scaled coordinates of the chunk in the dataset's file dataspace. The
+ * coordinates are used in both the collective re-allocation of space in the file
+ * and the collective re-insertion of chunks into the chunk index after the collective
+ * I/O has been performed.
+ *
+ * full_overwrite - A flag which determines whether or not a chunk needs to be read from the
+ * file when being updated. If a chunk is being fully overwritten (the entire
+ * extent is selected in its file dataspace), then it is not necessary to
+ * read the chunk from the file. However, if the chunk is not being fully
+ * overwritten, it has to be read from the file in order to update the chunk
+ * without trashing the parts of the chunk that are not selected.
+ *
+ * num_writers - The total number of processors writing to this chunk. This field is used
+ * when the new owner of a chunk is receiving messages, which contain selections in
+ * the chunk and data to update the chunk with, from other processors which have this
+ * chunk selected in the I/O operation. The new owner must know how many processors it
+ * should expect messages from so that it can post an equal number of receive calls.
+ *
+ * io_size - The total size of I/O to this chunk. This field is an accumulation of the size of
+ * I/O to the chunk from each processor which has the chunk selected and is used to
+ * determine the value for the previous full_overwrite flag.
+ *
+ * buf - A pointer which serves the dual purpose of holding either the chunk data which is to be
+ * written to the file or the chunk data which has been read from the file.
+ *
+ * chunk_states - In the case of dataset writes only, this struct is used to track a chunk's size and
+ * address in the file before and after the filtering operation has occurred.
+ *
+ * Its fields are as follows:
+ *
+ * chunk_current - The address in the file and size of this chunk before the filtering
+ * operation. When reading a chunk from the file, this field is used to
+ * read the correct amount of bytes. It is also used when redistributing
+ * shared chunks among processors and as a parameter to the chunk file
+ * space reallocation function.
+ *
+ * new_chunk - The address in the file and size of this chunk after the filtering
+ * operation. This field is relevant when collectively re-allocating space
+ * in the file for all of the chunks written to in the I/O operation, as
+ * their sizes may have changed after their data has been filtered.
+ *
+ * owners - In the case of dataset writes only, this struct is used to manage which single processor
+ * will ultimately write data out to the chunk. It allows the other processors to act according
+ * to the decision and send their selection in the chunk, as well as the data they wish
+ * to update the chunk with, to the processor which is writing to the chunk.
+ *
+ * Its fields are as follows:
+ *
+ * original_owner - The processor which originally had this chunk selected at the beginning of
+ * the collective filtered I/O operation. This field is currently used when
+ * redistributing shared chunks among processors.
+ *
+ * new_owner - The processor which has been selected to perform the write to this chunk.
+ *
+ * async_info - In the case of dataset writes only, this struct is used by the owning processor of the
+ * chunk in order to manage the MPI send and receive calls made between it and all of
+ * the other processors which have this chunk selected in the I/O operation.
+ *
+ * Its fields are as follows:
+ *
+ * receive_requests_array - An array containing one MPI_Request for each of the
+ * asynchronous MPI receive calls the owning processor of this
+ * chunk makes to another processor in order to receive that
+ * processor's chunk modification data and selection in the chunk.
+ *
+ * receive_buffer_array - An array of buffers into which the owning processor of this chunk
+ * will store chunk modification data and the selection in the chunk
+ * received from another processor.
+ *
+ * num_receive_requests - The number of entries in the receive_request_array and
+ * receive_buffer_array fields.
+ */
typedef struct H5D_filtered_collective_io_info_t {
- hsize_t index; /* "Index" of chunk in dataset */
- hsize_t scaled[H5O_LAYOUT_NDIMS]; /* Scaled coordinates of chunk (in file dataset's dataspace) */
- hbool_t full_overwrite; /* Whether or not this chunk is being fully overwritten */
- size_t num_writers; /* Total number of processes writing to this chunk */
- size_t io_size; /* Size of the I/O to this chunk */
- void *buf; /* Chunk data to be written to file/that has been read from file*/
+ hsize_t index;
+ hsize_t scaled[H5O_LAYOUT_NDIMS];
+ hbool_t full_overwrite;
+ size_t num_writers;
+ size_t io_size;
+ void *buf;
struct {
- H5F_block_t chunk_current; /* The address in the file and size of this chunk before the I/O and filtering operations */
- H5F_block_t new_chunk; /* The address in the file and size of this chunk after the I/O and filtering operations */
+ H5F_block_t chunk_current;
+ H5F_block_t new_chunk;
} chunk_states;
struct {
- int original_owner; /* The process which originally had this chunk selected in the I/O operation */
- int new_owner; /* The process which the chunk has been re-assigned to */
+ int original_owner;
+ int new_owner;
} owners;
struct {
- MPI_Request *receive_requests_array; /* Array of receive requests posted to receive chunk modification data from other processes */
- unsigned char **receive_buffer_array; /* Array of receive buffers to store chunk modification data from other processes */
- int num_receive_requests; /* Number of entries in the receive_requests_array and receive_buffer_array arrays */
+ MPI_Request *receive_requests_array;
+ unsigned char **receive_buffer_array;
+ int num_receive_requests;
} async_info;
} H5D_filtered_collective_io_info_t;
@@ -346,16 +428,26 @@ done:
/*-------------------------------------------------------------------------
* Function: H5D__mpio_array_gatherv
*
- * Purpose: Given arrays by MPI ranks, gathers them into a single array
- * which is either gathered to the rank specified by root when
- * allgather is false, or is distributed back to all ranks
- * when allgather is true. The number of processes
- * participating in the gather operation should be specified
- * for nprocs and the MPI communicator to use should be
- * specified for comm. If the sort_func argument is
- * specified, the list is sorted before being returned.
+ * Purpose: Given an array, specified in local_array, by each processor
+ * calling this function, gathers each array into a single
+ * array which is then either gathered to the processor
+ * specified by root, when allgather is false, or is
+ * distributed back to all processors when allgather is true.
+ *
+ * The size of each entry and number of entries in the array
+ * contributed by an individual processor should be specified
+ * in array_entry_size and local_array_num_entries,
+ * respectively.
*
- * If allgather is specified as true, root is ignored.
+ * The number of processors participating in the gather
+ * operation should be specified for nprocs.
+ *
+ * The MPI communicator to use should be specified for comm.
+ *
+ * If the sort_func argument is supplied, the array is sorted
+ * before the function returns.
+ *
+ * Note: if allgather is specified as true, root is ignored.
*
* Return: Non-negative on success/Negative on failure
*
@@ -371,9 +463,9 @@ H5D__mpio_array_gatherv(void *local_array, size_t local_array_num_entries,
{
size_t gathered_array_num_entries = 0; /* The size of the newly-constructed array */
size_t i;
- void *gathered_array = NULL; /* The newly-constructed array returned to the caller */
- int *receive_counts_array = NULL; /* Array containing number of entries each process is contributing */
- int *displacements_array = NULL; /* Array of displacements where each process places its data in the final array */
+ void *gathered_array = NULL; /* The newly-constructed array returned to the caller */
+ int *receive_counts_array = NULL; /* Array containing number of entries each process is contributing */
+ int *displacements_array = NULL; /* Array of displacements where each process places its data in the final array */
int mpi_code;
int sendcount;
herr_t ret_value = SUCCEED;
@@ -836,17 +928,17 @@ H5D__chunk_collective_io(H5D_io_info_t *io_info, const H5D_type_info_t *type_inf
if (io_info->op_type == H5D_IO_OP_READ) {
if(H5D__multi_chunk_filtered_collective_io(io_info, type_info, fm, dx_plist) < 0)
HGOTO_ERROR(H5E_IO, H5E_CANTGET, FAIL, "couldn't finish optimized multiple filtered chunk MPI-IO")
- }
+ } /* end if */
else {
if(H5D__link_chunk_filtered_collective_io(io_info, type_info, fm, dx_plist) < 0)
HGOTO_ERROR(H5E_IO, H5E_CANTGET, FAIL, "couldn't finish filtered linked chunk MPI-IO")
- }
- }
+ } /* end else */
+ } /* end if */
else {
/* Perform unfiltered link chunk collective IO */
if(H5D__link_chunk_collective_io(io_info, type_info, fm, sum_chunk, dx_plist) < 0)
HGOTO_ERROR(H5E_IO, H5E_CANTGET, FAIL, "couldn't finish linked chunk MPI-IO")
- }
+ } /* end else */
break;
case H5D_MULTI_CHUNK_IO: /* direct request to do multi-chunk IO */
@@ -855,12 +947,12 @@ H5D__chunk_collective_io(H5D_io_info_t *io_info, const H5D_type_info_t *type_inf
if(io_info->dset->shared->dcpl_cache.pline.nused > 0) {
if(H5D__multi_chunk_filtered_collective_io(io_info, type_info, fm, dx_plist) < 0)
HGOTO_ERROR(H5E_IO, H5E_CANTGET, FAIL, "couldn't finish optimized multiple filtered chunk MPI-IO")
- }
+ } /* end if */
else {
/* Perform unfiltered multi chunk collective IO */
if(H5D__multi_chunk_collective_io(io_info, type_info, fm, dx_plist) < 0)
HGOTO_ERROR(H5E_IO, H5E_CANTGET, FAIL, "couldn't finish optimized multiple chunk MPI-IO")
- }
+ } /* end else */
break;
} /* end switch */
@@ -1703,8 +1795,12 @@ done:
/*-------------------------------------------------------------------------
* Function: H5D__multi_chunk_filtered_collective_io
*
- * Purpose: To do filtered collective IO per chunk to save on memory,
- * as opposed to collective IO of every chunk at once
+ * Purpose: To do filtered collective IO iteratively to save on memory.
+ * While link_chunk_filtered_collective_io will construct and
+ * work on a list of all of the chunks selected in the IO
+ * operation at once, this function works iteratively on a set
+ * of chunks at a time; at most one chunk per rank per
+ * iteration.
*
* 1. Construct a list of selected chunks in the collective IO
* operation
@@ -2716,11 +2812,28 @@ done:
* to preserve file integrity after the write by ensuring
* that any shared chunks are only modified by one process.
*
- * The current implementation simply hands the list off to
- * rank 0, which then scans the list and for each shared
- * chunk, it redistributes the chunk to the process writing
- * to the chunk which currently has the least amount of chunks
- * assigned to it.
+ * The current implementation follows this 3-phase process:
+ *
+ * - Collect everyone's list of chunks into one large list,
+ * sort the list in increasing order of chunk offset in the
+ * file and hand the list off to rank 0
+ *
+ * - Rank 0 scans the list looking for matching runs of chunk
+ * offset in the file (corresponding to a shared chunk which
+ * has been selected by more than one rank in the I/O
+ * operation) and for each shared chunk, it redistributes
+ * the chunk to the process writing to the chunk which
+ * currently has the least amount of chunks assigned to it
+ * by modifying the "new_owner" field in each of the list
+ * entries corresponding to that chunk
+ *
+ * - After the chunks have been redistributed, rank 0 re-sorts
+ * the list in order of previous owner so that each rank
+ * will get back exactly the array that they contributed to
+ * the redistribution operation, with the "new_owner" field
+ * of each chunk they are modifying having possibly been
+ * modified. Rank 0 then scatters each segment of the list
+ * back to its corresponding rank
*
* Return: Non-negative on success/Negative on failure
*
@@ -2768,6 +2881,9 @@ H5D__chunk_redistribute_shared_chunks(const H5D_io_info_t *io_info, const H5D_ty
if (NULL == (mem_iter = (H5S_sel_iter_t *) H5MM_malloc(sizeof(H5S_sel_iter_t))))
HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate memory iterator")
+ /* Gather every rank's list of chunks to rank 0 to allow it to perform the redistribution operation. After this
+ * call, the gathered list will initially be sorted in increasing order of chunk offset in the file.
+ */
if (H5D__mpio_array_gatherv(local_chunk_array, *local_chunk_array_num_entries, sizeof(*local_chunk_array),
(void **) &shared_chunks_info_array, &shared_chunks_info_array_num_entries, mpi_size,
false, 0, io_info->comm, H5D__cmp_filtered_collective_io_info_entry) < 0)
@@ -3215,6 +3331,9 @@ H5D__filtered_collective_chunk_entry_io(H5D_filtered_collective_io_info_t *chunk
chunk_entry->async_info.receive_requests_array, MPI_STATUSES_IGNORE)))
HMPI_GOTO_ERROR(FAIL, "MPI_Waitall failed", mpi_code)
+ /* For each asynchronous receive call previously posted, receive the chunk modification
+ * buffer from another rank and update the chunk data
+ */
for (i = 0; i < (size_t) chunk_entry->async_info.num_receive_requests; i++) {
const unsigned char *mod_data_p;