From 5a3e6cf0800219816b018baf50b7133baa73bc68 Mon Sep 17 00:00:00 2001
From: Vailin Choi <vchoi@hdfgroup.org>
Date: Tue, 18 Nov 2014 18:19:14 -0500
Subject: [svn-r25824] Bring revision #25508 - #25509 from trunk to
 revise_chunks. h5committested.

---
 src/H5Dchunk.c | 242 +++++++++++++++++++++++++++++++++++++++++++++++----------
 1 file changed, 202 insertions(+), 40 deletions(-)

diff --git a/src/H5Dchunk.c b/src/H5Dchunk.c
index 12f4db7..bb4f30f 100644
--- a/src/H5Dchunk.c
+++ b/src/H5Dchunk.c
@@ -179,6 +179,13 @@ typedef struct H5D_chunk_file_iter_ud_t {
 #endif /* H5_HAVE_PARALLEL */
 } H5D_chunk_file_iter_ud_t;
 
+#ifdef H5_HAVE_PARALLEL
+/* information to construct a collective I/O operation for filling chunks */
+typedef struct H5D_chunk_coll_info_t {
+    size_t num_io;       /* Number of write operations */
+    haddr_t *addr;       /* array of the file addresses of the write operation */
+} H5D_chunk_coll_info_t;
+#endif /* H5_HAVE_PARALLEL */
 
 /********************/
 /* Local Prototypes */
@@ -235,7 +242,10 @@ static herr_t H5D__chunk_cache_prune(const H5D_t *dset, hid_t dxpl_id,
     const H5D_dxpl_cache_t *dxpl_cache, size_t size);
 static herr_t H5D__chunk_prune_fill(H5D_chunk_it_ud1_t *udata, hbool_t new_unfilt_chunk);
 static herr_t H5D__idx_chunk_alloc(const H5D_chk_idx_info_t *idx_info, H5D_chunk_ud_t *udata);
-
+#ifdef H5_HAVE_PARALLEL
+static herr_t H5D__chunk_collective_fill(const H5D_t *dset, hid_t dxpl_id,
+    H5D_chunk_coll_info_t *chunk_info, size_t chunk_size, const void *fill_buf);
+#endif /* H5_HAVE_PARALLEL */
 
 /*********************/
 /* Package Variables */
@@ -3566,8 +3576,8 @@ H5D__chunk_allocate(const H5D_t *dset, hid_t dxpl_id, hbool_t full_overwrite,
     hsize_t     min_unalloc[H5O_LAYOUT_NDIMS]; /* First chunk in each dimension that is unallocated */
     hsize_t     max_unalloc[H5O_LAYOUT_NDIMS]; /* Last chunk in each dimension that is unallocated */
     hsize_t	chunk_offset[H5O_LAYOUT_NDIMS]; /* Offset of current chunk */
-    size_t      chunk_size;      /* Size of current chunk in bytes, possibly filtered */
     size_t	orig_chunk_size; /* Original size of chunk in bytes */
+    size_t      chunk_size;      /* Actual size of chunk in bytes, possibly filtered */
     unsigned    filter_mask = 0; /* Filter mask for chunks that have them */
     const H5O_layout_t *layout = &(dset->shared->layout);       /* Dataset layout */
     const H5O_pline_t *pline = &(dset->shared->dcpl_cache.pline);    /* I/O pipeline info */
@@ -3580,11 +3590,9 @@ H5D__chunk_allocate(const H5D_t *dset, hid_t dxpl_id, hbool_t full_overwrite,
     H5D_dxpl_cache_t _dxpl_cache;       /* Data transfer property cache buffer */
     H5D_dxpl_cache_t *dxpl_cache = &_dxpl_cache;   /* Data transfer property cache */
 #ifdef H5_HAVE_PARALLEL
-    MPI_Comm	mpi_comm = MPI_COMM_NULL;	/* MPI communicator for file */
-    int         mpi_rank = (-1); /* This process's rank  */
-    int         mpi_code;       /* MPI return code */
     hbool_t     blocks_written = FALSE; /* Flag to indicate that chunk was actually written */
     hbool_t     using_mpi = FALSE;    /* Flag to indicate that the file is being accessed with an MPI-capable file driver */
+    H5D_chunk_coll_info_t chunk_info; /* chunk address information for doing I/O */
 #endif /* H5_HAVE_PARALLEL */
     hbool_t	carry;          /* Flag to indicate that chunk increment carrys to higher dimension (sorta) */
     int         space_ndims;    /* Dataset's space rank */
@@ -3597,7 +3605,6 @@ H5D__chunk_allocate(const H5D_t *dset, hid_t dxpl_id, hbool_t full_overwrite,
     hbool_t     unfilt_edge_chunk_dim[H5O_LAYOUT_NDIMS]; /* Whether there are unfiltered edge chunks at the edge of each dimension */
     hsize_t     edge_chunk_offset[H5O_LAYOUT_NDIMS]; /* Offset of the unfiltered edge chunks at the edge of each dimension */
     unsigned    nunfilt_edge_chunk_dims = 0; /* Number of dimensions on an edge */
-    hid_t       data_dxpl_id;           /* DXPL ID to use for raw data I/O operations */
     herr_t	ret_value = SUCCEED;	/* Return value */
 
     FUNC_ENTER_PACKAGE_TAG(dxpl_id, dset->oloc.addr, FAIL)
@@ -3637,30 +3644,17 @@ H5D__chunk_allocate(const H5D_t *dset, hid_t dxpl_id, hbool_t full_overwrite,
 #ifdef H5_HAVE_PARALLEL
     /* Retrieve MPI parameters */
     if(H5F_HAS_FEATURE(dset->oloc.file, H5FD_FEAT_HAS_MPI)) {
-        /* Get the MPI communicator */
-        if(MPI_COMM_NULL == (mpi_comm = H5F_mpi_get_comm(dset->oloc.file)))
-            HGOTO_ERROR(H5E_INTERNAL, H5E_MPI, FAIL, "Can't retrieve MPI communicator")
-
-        /* Get the MPI rank */
-        if((mpi_rank = H5F_mpi_get_rank(dset->oloc.file)) < 0)
-            HGOTO_ERROR(H5E_INTERNAL, H5E_MPI, FAIL, "Can't retrieve MPI rank")
-
         /* Set the MPI-capable file driver flag */
         using_mpi = TRUE;
 
-        /* Use the internal "independent" DXPL */
-        data_dxpl_id = H5AC_ind_dxpl_id;
+        /* init chunk info stuff for collective I/O */
+        chunk_info.num_io = 0;
+        chunk_info.addr = NULL;
     } /* end if */
-    else {
-#endif  /* H5_HAVE_PARALLEL */
-        /* Use the DXPL we were given */
-        data_dxpl_id = dxpl_id;
-#ifdef H5_HAVE_PARALLEL
-    } /* end else */
 #endif  /* H5_HAVE_PARALLEL */
 
     /* Fill the DXPL cache values for later use */
-    if(H5D__get_dxpl_cache(data_dxpl_id, &dxpl_cache) < 0)
+    if(H5D__get_dxpl_cache(dxpl_id, &dxpl_cache) < 0)
         HGOTO_ERROR(H5E_DATASET, H5E_CANTGET, FAIL, "can't fill dxpl cache")
 
     /* Calculate the minimum and maximum chunk offsets in each dimension, and
@@ -3715,7 +3709,7 @@ H5D__chunk_allocate(const H5D_t *dset, hid_t dxpl_id, hbool_t full_overwrite,
         if(H5D__fill_init(&fb_info, NULL, (H5MM_allocate_t)H5D__chunk_alloc,
                 (void *)pline, (H5MM_free_t)H5D__chunk_xfree, (void *)pline,
                 &dset->shared->dcpl_cache.fill, dset->shared->type,
-                dset->shared->type_id, (size_t)0, orig_chunk_size, data_dxpl_id) < 0)
+                dset->shared->type_id, (size_t)0, orig_chunk_size, dxpl_id) < 0)
             HGOTO_ERROR(H5E_DATASET, H5E_CANTINIT, FAIL, "can't initialize fill buffer info")
         fb_info_init = TRUE;
 
@@ -3848,6 +3842,9 @@ H5D__chunk_allocate(const H5D_t *dset, hid_t dxpl_id, hbool_t full_overwrite,
                 /* Sanity check */
                 HDassert(should_fill);
                 HDassert(!unfilt_fill_buf);
+#ifdef H5_HAVE_PARALLEL
+                HDassert(!using_mpi);   /* Can't write VL datatypes in parallel currently */
+#endif
 
                 /* Check to make sure the buffer is large enough.  It is
                  * possible (though ill-advised) for the filter to shrink the
@@ -3860,7 +3857,7 @@ H5D__chunk_allocate(const H5D_t *dset, hid_t dxpl_id, hbool_t full_overwrite,
                 } /* end if */
 
                 /* Fill the buffer with VL datatype fill values */
-                if(H5D__fill_refill_vl(&fb_info, fb_info.elmts_per_buf, data_dxpl_id) < 0)
+                if(H5D__fill_refill_vl(&fb_info, fb_info.elmts_per_buf, dxpl_id) < 0)
                     HGOTO_ERROR(H5E_DATASET, H5E_CANTCONVERT, FAIL, "can't refill fill value buffer")
 
                 /* Check if there are filters which need to be applied to the chunk */
@@ -3912,18 +3909,25 @@ H5D__chunk_allocate(const H5D_t *dset, hid_t dxpl_id, hbool_t full_overwrite,
 #ifdef H5_HAVE_PARALLEL
                 /* Check if this file is accessed with an MPI-capable file driver */
                 if(using_mpi) {
-                    /* Write the chunks out from only one process */
-                    /* !! Use the internal "independent" DXPL!! -QAK */
-                    if(H5_PAR_META_WRITE == mpi_rank)
-                        if(H5F_block_write(dset->oloc.file, H5FD_MEM_DRAW, udata.addr, chunk_size, data_dxpl_id, *fill_buf) < 0)
-                            HGOTO_ERROR(H5E_IO, H5E_WRITEERROR, FAIL, "unable to write raw data to file")
+                    /* collect all chunk addresses to be written to
+                       write collectively at the end */
+                    /* allocate/resize address array if no more space left */
+                    if(0 == chunk_info.num_io % 1024) {
+                        if(NULL == (chunk_info.addr = (haddr_t *)HDrealloc
+                                    (chunk_info.addr, (chunk_info.num_io + 1024) * sizeof(haddr_t))))
+                            HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "memory allocation failed for chunk addresses");
+                    } /* end if */
+
+                    /* Store the chunk's address for later */
+                    chunk_info.addr[chunk_info.num_io] = udata.addr;
+                    chunk_info.num_io++;
 
-                    /* Indicate that blocks are being written */
+                    /* Indicate that blocks will be written */
                     blocks_written = TRUE;
                 } /* end if */
                 else {
 #endif /* H5_HAVE_PARALLEL */
-                    if(H5F_block_write(dset->oloc.file, H5FD_MEM_DRAW, udata.addr, chunk_size, data_dxpl_id, *fill_buf) < 0)
+                    if(H5F_block_write(dset->oloc.file, H5FD_MEM_DRAW, udata.addr, chunk_size, dxpl_id, *fill_buf) < 0)
                         HGOTO_ERROR(H5E_IO, H5E_WRITEERROR, FAIL, "unable to write raw data to file")
 #ifdef H5_HAVE_PARALLEL
                 } /* end else */
@@ -3992,15 +3996,10 @@ H5D__chunk_allocate(const H5D_t *dset, hid_t dxpl_id, hbool_t full_overwrite,
     } /* end for(op_dim=0...) */
 
 #ifdef H5_HAVE_PARALLEL
-    /* Only need to block at the barrier if we actually initialized a chunk */
-    /* using an MPI-capable file driver */
+    /* do final collective I/O */
     if(using_mpi && blocks_written) {
-        /* Wait at barrier to avoid race conditions where some processes are
-         * still writing out chunks and other processes race ahead to read
-         * them in, getting bogus data.
-         */
-        if(MPI_SUCCESS != (mpi_code = MPI_Barrier(mpi_comm)))
-            HMPI_GOTO_ERROR(FAIL, "MPI_Barrier failed", mpi_code)
+        if(H5D__chunk_collective_fill(dset, dxpl_id, &chunk_info, chunk_size, fb_info.fill_buf) < 0)
+            HGOTO_ERROR(H5E_IO, H5E_WRITEERROR, FAIL, "unable to write raw data to file")
     } /* end if */
 #endif /* H5_HAVE_PARALLEL */
 
@@ -4015,6 +4014,13 @@ done:
     /* Free the unfiltered fill value buffer */
     unfilt_fill_buf = H5D__chunk_xfree(unfilt_fill_buf, &def_pline);
 
+#ifdef H5_HAVE_PARALLEL
+    if(using_mpi) {
+        if(chunk_info.addr)
+            HDfree(chunk_info.addr);
+    } /* end if */
+#endif
+
     FUNC_LEAVE_NOAPI_TAG(ret_value, FAIL)
 } /* end H5D__chunk_allocate() */
 
@@ -4217,6 +4223,162 @@ done:
     FUNC_LEAVE_NOAPI(ret_value)
 } /* end H5D__chunk_update_old_edge_chunks() */
 
+#ifdef H5_HAVE_PARALLEL
+
+/*-------------------------------------------------------------------------
+ * Function:	H5D__chunk_collective_fill
+ *
+ * Purpose:     Use MPIO collective write to fill the chunks (if number of
+ *              chunks to fill is greater than the number of MPI procs; 
+ *              otherwise use independent I/O).
+ *
+ * Return:	Non-negative on success/Negative on failure
+ *
+ * Programmer:	Mohamad Chaarawi
+ * 		July 30, 2014
+ *
+ *-------------------------------------------------------------------------
+ */
+static herr_t
+H5D__chunk_collective_fill(const H5D_t *dset, hid_t dxpl_id,
+    H5D_chunk_coll_info_t *chunk_info, size_t chunk_size, const void *fill_buf)
+{
+    MPI_Comm	mpi_comm = MPI_COMM_NULL;	/* MPI communicator for file */
+    int         mpi_rank = (-1);    /* This process's rank  */
+    int         mpi_size = (-1);    /* MPI Comm size  */
+    int         mpi_code;           /* MPI return code */
+    size_t      num_blocks;         /* Number of blocks between processes. */
+    size_t      leftover_blocks;    /* Number of leftover blocks to handle */
+    int         blocks, leftover, block_len; /* converted to int for MPI */
+    MPI_Aint    *chunk_disp_array = NULL;
+    int         *block_lens = NULL;
+    MPI_Datatype mem_type, file_type;
+    hid_t       data_dxpl_id = -1;  /* DXPL ID to use for raw data I/O operations */
+    int         i;                  /* Local index variable */
+    herr_t ret_value = SUCCEED;     /* Return value */
+
+    FUNC_ENTER_STATIC
+
+    /* Get the MPI communicator */
+    if(MPI_COMM_NULL == (mpi_comm = H5F_mpi_get_comm(dset->oloc.file)))
+        HGOTO_ERROR(H5E_INTERNAL, H5E_MPI, FAIL, "Can't retrieve MPI communicator")
+
+    /* Get the MPI rank */
+    if((mpi_rank = H5F_mpi_get_rank(dset->oloc.file)) < 0)
+        HGOTO_ERROR(H5E_INTERNAL, H5E_MPI, FAIL, "Can't retrieve MPI rank")
+
+    /* Get the MPI size */
+    if((mpi_size = H5F_mpi_get_size(dset->oloc.file)) < 0)
+        HGOTO_ERROR(H5E_INTERNAL, H5E_MPI, FAIL, "Can't retrieve MPI size")
+
+    /* Get a copy of the DXPL, to modify */
+    if((data_dxpl_id = H5P_copy_plist((H5P_genplist_t *)H5I_object(dxpl_id), TRUE)) < 0)
+        HGOTO_ERROR(H5E_PLIST, H5E_CANTCOPY, FAIL, "can't copy property list")
+
+    /* Distribute evenly the number of blocks between processes. */
+    num_blocks = chunk_info->num_io / mpi_size; /* value should be the same on all procs */
+
+    /* after evenly distributing the blocks between processes, are
+       there any leftover blocks for each individual process
+       (round-robin) */
+    leftover_blocks = chunk_info->num_io % mpi_size;
+
+    /* Cast values to types needed by MPI */
+    H5_ASSIGN_OVERFLOW(blocks, num_blocks, size_t, int);
+    H5_ASSIGN_OVERFLOW(leftover, leftover_blocks, size_t, int);
+    H5_ASSIGN_OVERFLOW(block_len,  chunk_size, size_t, int);
+
+    /* Allocate buffers */
+    /* (MSC - should not need block_lens if MPI_type_create_hindexed_block is working) */
+    if(NULL == (block_lens = (int *)H5MM_malloc((blocks + 1) * sizeof(int))))
+        HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate chunk lengths buffer")
+    if(NULL == (chunk_disp_array = (MPI_Aint *)H5MM_malloc((blocks + 1) * sizeof(MPI_Aint))))
+        HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate chunk file displacement buffer")
+
+    for(i = 0 ; i < blocks ; i++) {
+        /* store the chunk address as an MPI_Aint */
+        chunk_disp_array[i] = (MPI_Aint)(chunk_info->addr[i + mpi_rank*blocks]);
+
+        /* MSC - should not need this if MPI_type_create_hindexed_block is working */
+        block_lens[i] = block_len;
+
+        /* make sure that the addresses in the datatype are
+           monotonically non decreasing */
+        if(i)
+            HDassert(chunk_disp_array[i] > chunk_disp_array[i - 1]);
+    } /* end if */
+
+    /* calculate if there are any leftover blocks after evenly
+       distributing. If there are, then round robin the distribution
+       to processes 0 -> leftover. */
+    if(leftover && leftover > mpi_rank) {
+        chunk_disp_array[blocks] = (MPI_Aint)chunk_info->addr[blocks*mpi_size + mpi_rank];        
+        block_lens[blocks] = block_len;
+        blocks++;
+    }
+
+    /* MSC - should use this if MPI_type_create_hindexed block is working */
+    //mpi_code = MPI_Type_create_hindexed_block(blocks, block_len, chunk_disp_array, 
+    //MPI_BYTE, &file_type);
+    mpi_code = MPI_Type_create_hindexed(blocks, block_lens, chunk_disp_array, 
+                                        MPI_BYTE, &file_type);
+    if(mpi_code != MPI_SUCCESS)
+        HMPI_GOTO_ERROR(FAIL, "MPI_Type_create_hindexed failed", mpi_code)
+    if(MPI_SUCCESS != (mpi_code = MPI_Type_commit(&file_type)))
+        HMPI_GOTO_ERROR(FAIL, "MPI_Type_commit failed", mpi_code)
+
+    mpi_code = MPI_Type_create_hvector(blocks, block_len, 0, MPI_BYTE, &mem_type);
+    if(mpi_code != MPI_SUCCESS)
+        HMPI_GOTO_ERROR(FAIL, "MPI_Type_create_hvector failed", mpi_code)
+    if(MPI_SUCCESS != (mpi_code = MPI_Type_commit(&mem_type)))
+        HMPI_GOTO_ERROR(FAIL, "MPI_Type_commit failed", mpi_code)
+
+    /* set MPI-IO VFD properties */
+    {
+        H5FD_mpio_xfer_t xfer_mode = H5FD_MPIO_COLLECTIVE;
+        H5P_genplist_t  *plist;      /* Property list pointer */
+
+        if(NULL == (plist = H5P_object_verify(data_dxpl_id, H5P_DATASET_XFER)))
+            HGOTO_ERROR(H5E_PLIST, H5E_BADTYPE, FAIL, "not a dataset transfer list")
+
+        /* Set buffer MPI type */
+        if(H5P_set(plist, H5FD_MPI_XFER_MEM_MPI_TYPE_NAME, &mem_type) < 0)
+            HGOTO_ERROR(H5E_PLIST, H5E_CANTSET, FAIL, "can't set MPI-I/O property")
+
+        /* Set File MPI type */
+        if(H5P_set(plist, H5FD_MPI_XFER_FILE_MPI_TYPE_NAME, &file_type) < 0)
+            HGOTO_ERROR(H5E_PLIST, H5E_CANTSET, FAIL, "can't set MPI-I/O property")
+
+        /* set transfer mode */
+        if(H5P_set(plist, H5D_XFER_IO_XFER_MODE_NAME, &xfer_mode) < 0)
+            HGOTO_ERROR(H5E_PLIST, H5E_CANTSET, FAIL, "can't set transfer mode")
+    }
+
+    /* low level write */
+    if(H5F_block_write(dset->oloc.file, H5FD_MEM_DRAW, (haddr_t)0, (blocks) ? (size_t)1 : (size_t)0, 
+                       data_dxpl_id, fill_buf) < 0)
+        HGOTO_ERROR(H5E_IO, H5E_WRITEERROR, FAIL, "unable to write raw data to file")
+
+    /* Barrier so processes don't race ahead */
+    if(MPI_SUCCESS != (mpi_code = MPI_Barrier(mpi_comm)))
+        HMPI_GOTO_ERROR(FAIL, "MPI_Barrier failed", mpi_code)
+
+done:
+    if(data_dxpl_id > 0 && H5I_dec_ref(data_dxpl_id) < 0)
+        HDONE_ERROR(H5E_DATASET, H5E_CANTFREE, FAIL, "Can't free property list")
+
+    /* free things */
+    if(MPI_SUCCESS != (mpi_code = MPI_Type_free(&file_type)))
+        HMPI_DONE_ERROR(FAIL, "MPI_Type_free failed", mpi_code)
+    if(MPI_SUCCESS != (mpi_code = MPI_Type_free(&mem_type)))
+        HMPI_DONE_ERROR(FAIL, "MPI_Type_free failed", mpi_code)
+    H5MM_xfree(chunk_disp_array);
+    H5MM_xfree(block_lens);
+
+    FUNC_LEAVE_NOAPI(ret_value)
+} /* end H5D__chunk_collective_fill() */
+#endif /* H5_HAVE_PARALLEL */
+
 
 /*-------------------------------------------------------------------------
  * Function:	H5D__chunk_prune_fill
-- 
cgit v0.12