summaryrefslogtreecommitdiffstats
path: root/src/H5Dio.c
diff options
context:
space:
mode:
authorjhendersonHDF <jhenderson@hdfgroup.org>2023-10-10 15:11:22 (GMT)
committerGitHub <noreply@github.com>2023-10-10 15:11:22 (GMT)
commitbfbfaf72e17fcc9efa848557a0c57c0583d5c8c4 (patch)
tree46c6309ab264df8af6bba567713a22bb187baa2f /src/H5Dio.c
parent7631015ea4af183c01025c4907869f47f7355c51 (diff)
downloadhdf5-bfbfaf72e17fcc9efa848557a0c57c0583d5c8c4.zip
hdf5-bfbfaf72e17fcc9efa848557a0c57c0583d5c8c4.tar.gz
hdf5-bfbfaf72e17fcc9efa848557a0c57c0583d5c8c4.tar.bz2
Update parallel compression feature to support multi-dataset I/O (#3591)
Diffstat (limited to 'src/H5Dio.c')
-rw-r--r--src/H5Dio.c79
1 files changed, 53 insertions, 26 deletions
diff --git a/src/H5Dio.c b/src/H5Dio.c
index 543bb56..2134ce1 100644
--- a/src/H5Dio.c
+++ b/src/H5Dio.c
@@ -107,6 +107,17 @@ H5D__read(size_t count, H5D_dset_io_info_t *dset_info)
FUNC_ENTER_NOAPI(FAIL)
+#ifdef H5_HAVE_PARALLEL
+ /* Reset the actual io mode properties to the default values in case
+ * the DXPL (if it's non-default) was previously used in a collective
+ * I/O operation.
+ */
+ if (!H5CX_is_def_dxpl()) {
+ H5CX_set_mpio_actual_chunk_opt(H5D_MPIO_NO_CHUNK_OPTIMIZATION);
+ H5CX_set_mpio_actual_io_mode(H5D_MPIO_NO_COLLECTIVE);
+ } /* end if */
+#endif
+
/* Init io_info */
if (H5D__ioinfo_init(count, H5D_IO_OP_READ, dset_info, &io_info) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_CANTINIT, FAIL, "can't initialize I/O info");
@@ -222,6 +233,14 @@ H5D__read(size_t count, H5D_dset_io_info_t *dset_info)
dset_info[i].buf.vp = (void *)(((uint8_t *)dset_info[i].buf.vp) + buf_adj);
} /* end if */
+ /* Set up I/O operation */
+ if (H5D__dset_ioinfo_init(dset_info[i].dset, &(dset_info[i]), &(store[i])) < 0)
+ HGOTO_ERROR(H5E_DATASET, H5E_CANTINIT, FAIL, "unable to set up I/O operation");
+
+ /* Check if any filters are applied to the dataset */
+ if (dset_info[i].dset->shared->dcpl_cache.pline.nused > 0)
+ io_info.filtered_count++;
+
/* If space hasn't been allocated and not using external storage,
* return fill value to buffer if fill time is upon allocation, or
* do nothing if fill time is never. If the dataset is compact and
@@ -259,10 +278,6 @@ H5D__read(size_t count, H5D_dset_io_info_t *dset_info)
io_skipped = io_skipped + 1;
} /* end if */
else {
- /* Set up I/O operation */
- if (H5D__dset_ioinfo_init(dset_info[i].dset, &(dset_info[i]), &(store[i])) < 0)
- HGOTO_ERROR(H5E_DATASET, H5E_UNSUPPORTED, FAIL, "unable to set up I/O operation");
-
/* Sanity check that space is allocated, if there are elements */
if (dset_info[i].nelmts > 0)
assert(
@@ -273,22 +288,23 @@ H5D__read(size_t count, H5D_dset_io_info_t *dset_info)
dset_info[i].dset->shared->dcpl_cache.efl.nused > 0 ||
dset_info[i].dset->shared->layout.type == H5D_COMPACT);
- /* Call storage method's I/O initialization routine */
- if (dset_info[i].layout_ops.io_init &&
- (dset_info[i].layout_ops.io_init)(&io_info, &(dset_info[i])) < 0)
- HGOTO_ERROR(H5E_DATASET, H5E_CANTINIT, FAIL, "can't initialize I/O info");
dset_info[i].skip_io = false;
- io_op_init++;
-
- /* Reset metadata tagging */
- H5AC_tag(prev_tag, NULL);
}
+
+ /* Call storage method's I/O initialization routine */
+ if (dset_info[i].layout_ops.io_init &&
+ (dset_info[i].layout_ops.io_init)(&io_info, &(dset_info[i])) < 0)
+ HGOTO_ERROR(H5E_DATASET, H5E_CANTINIT, FAIL, "can't initialize I/O info");
+ io_op_init++;
+
+ /* Reset metadata tagging */
+ H5AC_tag(prev_tag, NULL);
} /* end of for loop */
- assert(io_op_init + io_skipped == count);
+ assert(io_op_init == count);
/* If no datasets have I/O, we're done */
- if (io_op_init == 0)
+ if (io_skipped == count)
HGOTO_DONE(SUCCEED);
/* Perform second phase of type info initialization */
@@ -323,7 +339,11 @@ H5D__read(size_t count, H5D_dset_io_info_t *dset_info)
}
/* MDIO-specific second phase initialization */
- for (i = 0; i < count; i++)
+ for (i = 0; i < count; i++) {
+ /* Check for skipped I/O */
+ if (dset_info[i].skip_io)
+ continue;
+
if (dset_info[i].layout_ops.mdio_init) {
haddr_t prev_tag = HADDR_UNDEF;
@@ -337,6 +357,7 @@ H5D__read(size_t count, H5D_dset_io_info_t *dset_info)
/* Reset metadata tagging */
H5AC_tag(prev_tag, NULL);
}
+ }
/* Invoke correct "high level" I/O routine */
if ((*io_info.md_io_ops.multi_read_md)(&io_info) < 0)
@@ -430,7 +451,7 @@ H5D__read(size_t count, H5D_dset_io_info_t *dset_info)
done:
/* Shut down the I/O op information */
for (i = 0; i < io_op_init; i++)
- if (!dset_info[i].skip_io && dset_info[i].layout_ops.io_term &&
+ if (dset_info[i].layout_ops.io_term &&
(*dset_info[i].layout_ops.io_term)(&io_info, &(dset_info[i])) < 0)
HDONE_ERROR(H5E_DATASET, H5E_CANTCLOSEOBJ, FAIL, "unable to shut down I/O op info");
@@ -512,6 +533,17 @@ H5D__write(size_t count, H5D_dset_io_info_t *dset_info)
FUNC_ENTER_NOAPI(FAIL)
+#ifdef H5_HAVE_PARALLEL
+ /* Reset the actual io mode properties to the default values in case
+ * the DXPL (if it's non-default) was previously used in a collective
+ * I/O operation.
+ */
+ if (!H5CX_is_def_dxpl()) {
+ H5CX_set_mpio_actual_chunk_opt(H5D_MPIO_NO_CHUNK_OPTIMIZATION);
+ H5CX_set_mpio_actual_io_mode(H5D_MPIO_NO_COLLECTIVE);
+ } /* end if */
+#endif
+
/* Init io_info */
if (H5D__ioinfo_init(count, H5D_IO_OP_WRITE, dset_info, &io_info) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_CANTINIT, FAIL, "can't initialize I/O info");
@@ -586,7 +618,7 @@ H5D__write(size_t count, H5D_dset_io_info_t *dset_info)
if (NULL == dset_info[i].buf.cvp) {
/* Check for any elements selected (which is invalid) */
if (dset_info[i].nelmts > 0)
- HGOTO_ERROR(H5E_ARGS, H5E_BADVALUE, FAIL, "no output buffer");
+ HGOTO_ERROR(H5E_ARGS, H5E_BADVALUE, FAIL, "no input buffer");
/* If the buffer is nil, and 0 element is selected, make a fake buffer.
* This is for some MPI package like ChaMPIon on NCSA's tungsten which
@@ -655,6 +687,10 @@ H5D__write(size_t count, H5D_dset_io_info_t *dset_info)
if (H5D__dset_ioinfo_init(dset_info[i].dset, &(dset_info[i]), &(store[i])) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_CANTINIT, FAIL, "unable to set up I/O operation");
+ /* Check if any filters are applied to the dataset */
+ if (dset_info[i].dset->shared->dcpl_cache.pline.nused > 0)
+ io_info.filtered_count++;
+
/* Allocate dataspace and initialize it if it hasn't been. */
should_alloc_space = dset_info[i].dset->shared->dcpl_cache.efl.nused == 0 &&
!(*dset_info[i].dset->shared->layout.ops->is_space_alloc)(
@@ -1225,15 +1261,6 @@ H5D__ioinfo_adjust(H5D_io_info_t *io_info)
dset0 = io_info->dsets_info[0].dset;
assert(dset0->oloc.file);
- /* Reset the actual io mode properties to the default values in case
- * the DXPL (if it's non-default) was previously used in a collective
- * I/O operation.
- */
- if (!H5CX_is_def_dxpl()) {
- H5CX_set_mpio_actual_chunk_opt(H5D_MPIO_NO_CHUNK_OPTIMIZATION);
- H5CX_set_mpio_actual_io_mode(H5D_MPIO_NO_COLLECTIVE);
- } /* end if */
-
/* Make any parallel I/O adjustments */
if (io_info->using_mpi_vfd) {
H5FD_mpio_xfer_t xfer_mode; /* Parallel transfer for this request */