summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--examples/Makefile.am2
-rw-r--r--examples/h5ff_client_analysis2.c355
-rw-r--r--testff/CMakeLists.txt1
-rw-r--r--testff/h5ff_client_analysis2.c355
4 files changed, 712 insertions, 1 deletions
diff --git a/examples/Makefile.am b/examples/Makefile.am
index 1689f91..91322a1 100644
--- a/examples/Makefile.am
+++ b/examples/Makefile.am
@@ -34,7 +34,7 @@ if BUILD_EFF_CONDITIONAL
h5ff_client_vl_data h5ff_python_exec h5ff_client_analysis \
h5ff_client_evict_deltas h5ff_client_prefetch \
h5ff_client_M6.2_demo h5ff_client_M7.2-pep_demo \
- h5ff_client_timings h5ff_client_time_datasets
+ h5ff_client_timings h5ff_client_time_datasets h5ff_client_analysis2
endif
if BUILD_INDEXING_CONDITIONAL
diff --git a/examples/h5ff_client_analysis2.c b/examples/h5ff_client_analysis2.c
new file mode 100644
index 0000000..8e7edca
--- /dev/null
+++ b/examples/h5ff_client_analysis2.c
@@ -0,0 +1,355 @@
+/*
+ * h5ff_client_analysis.c: Client side test for analysis shipping routines.
+ */
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <assert.h>
+#include <string.h>
+
+#include <mpi.h>
+#include <hdf5.h>
+
+/* define filename for this app, and max size after username prepended */
+#define FILENAME_APP "eff_analysis.h5"
+#define NAME_SIZE 64
+
+/* macros related to error reporting */
+#define STATUS (ret >= 0) ? " " : " - FAILED"
+#define ASSERT_RET assert( ret >= 0 )
+
+#define NTUPLES 512
+
+#define DATASET_POSITION "Position"
+#define DATASET_VELOCITY "Velocity"
+#define DATASET_PRESSURE "Pressure"
+#define DATASET_TEMPERATURE "Temperature"
+
+static int my_rank = 0, my_size = 1;
+
+struct analysis_data {
+ size_t ntuples;
+ hid_t type_id;
+ double *position;
+ double *velocity;
+ double *pressure;
+ double *temperature;
+};
+
+/* Local sum and global sum */
+const char *split_script =
+ "import numpy as np\n"
+ "def split(array):\n"
+ " print '--------------------'\n"
+ " print 'Split sum: ' + str(array.sum())\n"
+ " print 'Split average: ' + str(np.average(array))\n"
+ " print '--------------------'\n"
+ " return np.array([array.sum(), np.average(array)])\n";
+
+const char *combine_script =
+ "import numpy as np\n"
+ "def combine(arrays):\n"
+ " global_sum = 0\n"
+ " global_average = 0\n"
+ " for a in arrays:\n"
+ " global_sum += a[0]\n"
+ " global_average += a[1]\n"
+ " global_average /= len(arrays)\n"
+ " print '--------------------'\n"
+ " print 'Combined sum: ' + str(global_sum)\n"
+ " print 'Combined average: ' + str(global_average)\n"
+ " print '--------------------'\n"
+ " return np.array([global_sum, global_average])\n";
+
+const char *integrate_script =
+ "import numpy as np\n"
+ "def integrate(arrays):\n"
+ " global_sum = 0\n"
+ " global_average = 0\n"
+ " for a in arrays:\n"
+ " global_sum += a[0]\n"
+ " global_average += a[1]\n"
+ " global_average /= len(arrays)\n"
+ " print '--------------------'\n"
+ " print 'Integrate sum: ' + str(global_sum)\n"
+ " print 'Integrate average: ' + str(global_average)\n"
+ " print '--------------------'\n"
+ " return np.array([global_sum, global_average])\n";
+
+static void
+write_dataset(hid_t group_id, const char *dataset_name,
+ hsize_t ntuples, hsize_t ncomponents, hid_t datatype_id,
+ void *buf, hid_t trans_id, hid_t estack_id)
+{
+ hid_t dataset_id;
+ hid_t space_id;
+ hsize_t dims[2] = {ntuples, ncomponents};
+ int rank = (ncomponents == 1) ? 1 : 2;
+ herr_t ret;
+
+ /* Create the data space for the first dataset. */
+ space_id = H5Screate_simple(rank, dims, NULL);
+ assert(space_id);
+
+ /* Create a dataset. */
+ dataset_id = H5Dcreate_ff(group_id, dataset_name, datatype_id, space_id,
+ H5P_DEFAULT, H5P_DEFAULT, H5P_DEFAULT, trans_id, estack_id);
+ assert(dataset_id);
+
+ /* Write the first dataset. */
+ ret = H5Dwrite_ff(dataset_id, datatype_id, H5S_ALL, H5S_ALL,
+ H5P_DEFAULT, buf, trans_id, estack_id);
+ assert(0 == ret);
+
+ /* Close the first dataset. */
+ ret = H5Dclose_ff(dataset_id, estack_id);
+ assert(0 == ret);
+
+ /* Close dataspace */
+ ret = H5Sclose(space_id);
+ assert(0 == ret);
+}
+
+
+static void
+init_data(size_t ntuples, struct analysis_data *data)
+{
+ size_t i,j;
+ size_t ncomponents;
+
+ data->type_id = H5T_NATIVE_DOUBLE;
+ data->ntuples = ntuples;
+
+ /* Initialize position */
+ ncomponents = 3;
+ data->position = (double *) malloc(sizeof(double) * ncomponents * ntuples);
+ for (i = 0; i < ntuples; i++) {
+ for (j = 0; j < ncomponents; j++) {
+ data->position[ncomponents * i + j] =
+ (double) (((hsize_t) my_rank) * ntuples + i);
+ }
+ }
+
+ /* Initialize velocity */
+ data->velocity = (double *) malloc(sizeof(double) * ncomponents * ntuples);
+ for (i = 0; i < ntuples; i++) {
+ for (j = 0; j < ncomponents; j++) {
+ data->velocity[ncomponents * i + j] =
+ (double) (((hsize_t) my_rank) * ntuples + i);
+ }
+ }
+
+ /* Initialize pressure */
+ ncomponents = 1;
+ data->pressure = (double *) malloc(sizeof(double) * ncomponents * ntuples);
+ for (i = 0; i < ntuples; i++) {
+ for (j = 0; j < ncomponents; j++) {
+ data->pressure[ncomponents * i + j] =
+ (double) (((hsize_t) my_rank) * ntuples + i);
+ }
+ }
+
+ /* Initialize temperature */
+ data->temperature = (double *) malloc(sizeof(double) * ncomponents * ntuples);
+ for (i = 0; i < ntuples; i++) {
+ for (j = 0; j < ncomponents; j++) {
+ data->temperature[ncomponents * i + j] =
+ (double) (((hsize_t) my_rank) * ntuples + i);
+ }
+ }
+}
+
+static void
+free_data(struct analysis_data *data)
+{
+ free(data->position);
+ free(data->velocity);
+ free(data->pressure);
+ free(data->temperature);
+}
+
+static void
+write_data(hid_t file_id, struct analysis_data data, hid_t estack_id)
+{
+ hid_t group_id;
+ hid_t trans_id, rcxt_id, trspl_id;
+ uint64_t version = 1;
+ herr_t ret;
+ char group_name[NAME_SIZE];
+ hsize_t ncomponents;
+
+ /* Every rank creates Group_"my_rank" */
+ sprintf(group_name, "Group_%d", my_rank);
+
+ /* acquire container version 1 - EXACT. */
+ if(0 == my_rank) {
+ rcxt_id = H5RCacquire(file_id, &version, H5P_DEFAULT, estack_id);
+ } else {
+ rcxt_id = H5RCcreate(file_id, version);
+ }
+ assert(1 == version);
+
+ /* create transaction object */
+ trans_id = H5TRcreate(file_id, rcxt_id, version + 1);
+ assert(trans_id);
+
+ trspl_id = H5Pcreate(H5P_TR_START);
+ ret = H5Pset_trspl_num_peers(trspl_id, (unsigned int) my_size);
+ assert(0 == ret);
+ ret = H5TRstart(trans_id, trspl_id, estack_id);
+ assert(0 == ret);
+ ret = H5Pclose(trspl_id);
+ assert(0 == ret);
+
+ group_id = H5Gcreate_ff(file_id, group_name, H5P_DEFAULT, H5P_DEFAULT,
+ H5P_DEFAULT, trans_id, estack_id);
+ assert(group_id > 0);
+
+ /* Write datasets */
+ ncomponents = 3;
+ write_dataset(group_id, DATASET_POSITION, data.ntuples, ncomponents,
+ data.type_id, data.position, trans_id, estack_id);
+ write_dataset(group_id, DATASET_VELOCITY, data.ntuples, ncomponents,
+ data.type_id, data.velocity, trans_id, estack_id);
+
+ ncomponents = 1;
+ write_dataset(group_id, DATASET_PRESSURE, data.ntuples, ncomponents,
+ data.type_id, data.pressure, trans_id, estack_id);
+ write_dataset(group_id, DATASET_TEMPERATURE, data.ntuples, ncomponents,
+ data.type_id, data.temperature, trans_id, estack_id);
+
+ ret = H5Gclose_ff(group_id, estack_id);
+ assert(0 == ret);
+
+ /* Finish transaction 0. */
+ ret = H5TRfinish(trans_id, H5P_DEFAULT, NULL, estack_id);
+ assert(0 == ret);
+
+ /* release container version 0. */
+ if (my_rank == 0) {
+ ret = H5RCrelease(rcxt_id, estack_id);
+ assert(0 == ret);
+ }
+
+ ret = H5RCclose(rcxt_id);
+ assert(0 == ret);
+
+ ret = H5TRclose(trans_id);
+ assert(0 == ret);
+}
+
+static void
+ship_analysis(const char *file_name)
+{
+ double lb = 39.1, ub = 42.1;
+ hid_t query_id1, query_id2, query_id3, query_id4;
+ hid_t query_id;
+ herr_t ret;
+
+ /* Create a simple query */
+ /* query = (39.1 < x < 42.1) AND (name = "Temperature") */
+ query_id1 = H5Qcreate(H5Q_TYPE_DATA_ELEM, H5Q_MATCH_GREATER_THAN,
+ H5T_NATIVE_DOUBLE, &lb);
+ assert(query_id1);
+
+ query_id2 = H5Qcreate(H5Q_TYPE_DATA_ELEM, H5Q_MATCH_LESS_THAN,
+ H5T_NATIVE_DOUBLE, &ub);
+ assert(query_id2);
+
+ query_id3 = H5Qcombine(query_id1, H5Q_COMBINE_AND, query_id2);
+ assert(query_id3);
+
+ query_id4 = H5Qcreate(H5Q_TYPE_LINK_NAME, H5Q_MATCH_EQUAL,
+ DATASET_TEMPERATURE);
+ assert(query_id4);
+
+ query_id = H5Qcombine(query_id3, H5Q_COMBINE_AND, query_id4);
+ assert(query_id);
+
+ /* Issue an anlysis shipping request */
+ ret = H5ASinvoke(file_name, query_id, split_script, combine_script,
+ integrate_script, H5_EVENT_STACK_NULL);
+ assert(0 == ret);
+
+ H5Qclose(query_id);
+ H5Qclose(query_id4);
+ H5Qclose(query_id3);
+ H5Qclose(query_id2);
+ H5Qclose(query_id1);
+}
+
+int
+main(int argc, char **argv)
+{
+ const char *user_name;
+ char file_name[NAME_SIZE];
+ hid_t file_id, fapl_id;
+ hid_t estack_id = H5_EVENT_STACK_NULL;
+ herr_t ret;
+ H5ES_status_t status;
+ size_t num_events = 0;
+ struct analysis_data data;
+ size_t ntuples = NTUPLES;
+
+ /* Prepend user name */
+ user_name = getenv("USER");
+ snprintf(file_name, NAME_SIZE, "%s_%s", user_name, FILENAME_APP);
+
+ MPI_Init(&argc, &argv);
+
+ /* Call EFF_init to initialize the EFF stack. */
+ EFF_init(MPI_COMM_WORLD, MPI_INFO_NULL);
+
+ MPI_Comm_rank(MPI_COMM_WORLD, &my_rank);
+ MPI_Comm_size(MPI_COMM_WORLD, &my_size);
+ fprintf(stderr, "APP processes = %d, my rank is %d\n", my_size, my_rank);
+
+ MPI_Barrier(MPI_COMM_WORLD);
+
+ /* Choose the IOD VOL plugin to use with this file. */
+ fapl_id = H5Pcreate(H5P_FILE_ACCESS);
+ H5Pset_fapl_iod(fapl_id, MPI_COMM_WORLD, MPI_INFO_NULL);
+
+ /* create an event Queue for managing asynchronous requests. */
+ estack_id = H5EScreate();
+ assert(estack_id);
+
+ /* Open an existing file. */
+ file_id = H5Fcreate_ff(file_name, H5F_ACC_TRUNC, H5P_DEFAULT, fapl_id,
+ H5_EVENT_STACK_NULL);
+
+ ret = H5Pclose(fapl_id);
+ assert(0 == ret);
+
+ init_data(ntuples, &data);
+
+ write_data(file_id, data, estack_id);
+
+ H5ESget_count(estack_id, &num_events);
+ H5ESwait_all(estack_id, &status);
+ H5ESclear(estack_id);
+ printf("%zu events in event stack. Completion status = %d\n", num_events, status);
+ assert(status == H5ES_STATUS_SUCCEED);
+
+ MPI_Barrier(MPI_COMM_WORLD);
+
+ free_data(&data);
+
+ if(0 == my_rank) {
+ ship_analysis(file_name);
+ }
+
+ MPI_Barrier(MPI_COMM_WORLD);
+
+ ret = H5ESclose(estack_id);
+ assert(ret == 0);
+
+ /* Close the file. */
+ ret = H5Fclose_ff(file_id, 1, H5_EVENT_STACK_NULL);
+ assert(0 == ret);
+
+ EFF_finalize();
+ MPI_Finalize();
+
+ return 0;
+}
diff --git a/testff/CMakeLists.txt b/testff/CMakeLists.txt
index bc28dff..a3076a5 100644
--- a/testff/CMakeLists.txt
+++ b/testff/CMakeLists.txt
@@ -71,6 +71,7 @@ BUILD_H5FF_TEST(h5ff_query)
# ADD_H5FF_DRIVER_TEST(<target>)
BUILD_H5FF_TEST(h5ff_server)
BUILD_H5FF_TEST(h5ff_client_analysis)
+BUILD_H5FF_TEST(h5ff_client_analysis2)
BUILD_H5FF_TEST(h5ff_client_attr)
BUILD_H5FF_TEST(h5ff_client_dset)
BUILD_H5FF_TEST(h5ff_client_evict_deltas)
diff --git a/testff/h5ff_client_analysis2.c b/testff/h5ff_client_analysis2.c
new file mode 100644
index 0000000..8e7edca
--- /dev/null
+++ b/testff/h5ff_client_analysis2.c
@@ -0,0 +1,355 @@
+/*
+ * h5ff_client_analysis.c: Client side test for analysis shipping routines.
+ */
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <assert.h>
+#include <string.h>
+
+#include <mpi.h>
+#include <hdf5.h>
+
+/* define filename for this app, and max size after username prepended */
+#define FILENAME_APP "eff_analysis.h5"
+#define NAME_SIZE 64
+
+/* macros related to error reporting */
+#define STATUS (ret >= 0) ? " " : " - FAILED"
+#define ASSERT_RET assert( ret >= 0 )
+
+#define NTUPLES 512
+
+#define DATASET_POSITION "Position"
+#define DATASET_VELOCITY "Velocity"
+#define DATASET_PRESSURE "Pressure"
+#define DATASET_TEMPERATURE "Temperature"
+
+static int my_rank = 0, my_size = 1;
+
+struct analysis_data {
+ size_t ntuples;
+ hid_t type_id;
+ double *position;
+ double *velocity;
+ double *pressure;
+ double *temperature;
+};
+
+/* Local sum and global sum */
+const char *split_script =
+ "import numpy as np\n"
+ "def split(array):\n"
+ " print '--------------------'\n"
+ " print 'Split sum: ' + str(array.sum())\n"
+ " print 'Split average: ' + str(np.average(array))\n"
+ " print '--------------------'\n"
+ " return np.array([array.sum(), np.average(array)])\n";
+
+const char *combine_script =
+ "import numpy as np\n"
+ "def combine(arrays):\n"
+ " global_sum = 0\n"
+ " global_average = 0\n"
+ " for a in arrays:\n"
+ " global_sum += a[0]\n"
+ " global_average += a[1]\n"
+ " global_average /= len(arrays)\n"
+ " print '--------------------'\n"
+ " print 'Combined sum: ' + str(global_sum)\n"
+ " print 'Combined average: ' + str(global_average)\n"
+ " print '--------------------'\n"
+ " return np.array([global_sum, global_average])\n";
+
+const char *integrate_script =
+ "import numpy as np\n"
+ "def integrate(arrays):\n"
+ " global_sum = 0\n"
+ " global_average = 0\n"
+ " for a in arrays:\n"
+ " global_sum += a[0]\n"
+ " global_average += a[1]\n"
+ " global_average /= len(arrays)\n"
+ " print '--------------------'\n"
+ " print 'Integrate sum: ' + str(global_sum)\n"
+ " print 'Integrate average: ' + str(global_average)\n"
+ " print '--------------------'\n"
+ " return np.array([global_sum, global_average])\n";
+
+static void
+write_dataset(hid_t group_id, const char *dataset_name,
+ hsize_t ntuples, hsize_t ncomponents, hid_t datatype_id,
+ void *buf, hid_t trans_id, hid_t estack_id)
+{
+ hid_t dataset_id;
+ hid_t space_id;
+ hsize_t dims[2] = {ntuples, ncomponents};
+ int rank = (ncomponents == 1) ? 1 : 2;
+ herr_t ret;
+
+ /* Create the data space for the first dataset. */
+ space_id = H5Screate_simple(rank, dims, NULL);
+ assert(space_id);
+
+ /* Create a dataset. */
+ dataset_id = H5Dcreate_ff(group_id, dataset_name, datatype_id, space_id,
+ H5P_DEFAULT, H5P_DEFAULT, H5P_DEFAULT, trans_id, estack_id);
+ assert(dataset_id);
+
+ /* Write the first dataset. */
+ ret = H5Dwrite_ff(dataset_id, datatype_id, H5S_ALL, H5S_ALL,
+ H5P_DEFAULT, buf, trans_id, estack_id);
+ assert(0 == ret);
+
+ /* Close the first dataset. */
+ ret = H5Dclose_ff(dataset_id, estack_id);
+ assert(0 == ret);
+
+ /* Close dataspace */
+ ret = H5Sclose(space_id);
+ assert(0 == ret);
+}
+
+
+static void
+init_data(size_t ntuples, struct analysis_data *data)
+{
+ size_t i,j;
+ size_t ncomponents;
+
+ data->type_id = H5T_NATIVE_DOUBLE;
+ data->ntuples = ntuples;
+
+ /* Initialize position */
+ ncomponents = 3;
+ data->position = (double *) malloc(sizeof(double) * ncomponents * ntuples);
+ for (i = 0; i < ntuples; i++) {
+ for (j = 0; j < ncomponents; j++) {
+ data->position[ncomponents * i + j] =
+ (double) (((hsize_t) my_rank) * ntuples + i);
+ }
+ }
+
+ /* Initialize velocity */
+ data->velocity = (double *) malloc(sizeof(double) * ncomponents * ntuples);
+ for (i = 0; i < ntuples; i++) {
+ for (j = 0; j < ncomponents; j++) {
+ data->velocity[ncomponents * i + j] =
+ (double) (((hsize_t) my_rank) * ntuples + i);
+ }
+ }
+
+ /* Initialize pressure */
+ ncomponents = 1;
+ data->pressure = (double *) malloc(sizeof(double) * ncomponents * ntuples);
+ for (i = 0; i < ntuples; i++) {
+ for (j = 0; j < ncomponents; j++) {
+ data->pressure[ncomponents * i + j] =
+ (double) (((hsize_t) my_rank) * ntuples + i);
+ }
+ }
+
+ /* Initialize temperature */
+ data->temperature = (double *) malloc(sizeof(double) * ncomponents * ntuples);
+ for (i = 0; i < ntuples; i++) {
+ for (j = 0; j < ncomponents; j++) {
+ data->temperature[ncomponents * i + j] =
+ (double) (((hsize_t) my_rank) * ntuples + i);
+ }
+ }
+}
+
+static void
+free_data(struct analysis_data *data)
+{
+ free(data->position);
+ free(data->velocity);
+ free(data->pressure);
+ free(data->temperature);
+}
+
+static void
+write_data(hid_t file_id, struct analysis_data data, hid_t estack_id)
+{
+ hid_t group_id;
+ hid_t trans_id, rcxt_id, trspl_id;
+ uint64_t version = 1;
+ herr_t ret;
+ char group_name[NAME_SIZE];
+ hsize_t ncomponents;
+
+ /* Every rank creates Group_"my_rank" */
+ sprintf(group_name, "Group_%d", my_rank);
+
+ /* acquire container version 1 - EXACT. */
+ if(0 == my_rank) {
+ rcxt_id = H5RCacquire(file_id, &version, H5P_DEFAULT, estack_id);
+ } else {
+ rcxt_id = H5RCcreate(file_id, version);
+ }
+ assert(1 == version);
+
+ /* create transaction object */
+ trans_id = H5TRcreate(file_id, rcxt_id, version + 1);
+ assert(trans_id);
+
+ trspl_id = H5Pcreate(H5P_TR_START);
+ ret = H5Pset_trspl_num_peers(trspl_id, (unsigned int) my_size);
+ assert(0 == ret);
+ ret = H5TRstart(trans_id, trspl_id, estack_id);
+ assert(0 == ret);
+ ret = H5Pclose(trspl_id);
+ assert(0 == ret);
+
+ group_id = H5Gcreate_ff(file_id, group_name, H5P_DEFAULT, H5P_DEFAULT,
+ H5P_DEFAULT, trans_id, estack_id);
+ assert(group_id > 0);
+
+ /* Write datasets */
+ ncomponents = 3;
+ write_dataset(group_id, DATASET_POSITION, data.ntuples, ncomponents,
+ data.type_id, data.position, trans_id, estack_id);
+ write_dataset(group_id, DATASET_VELOCITY, data.ntuples, ncomponents,
+ data.type_id, data.velocity, trans_id, estack_id);
+
+ ncomponents = 1;
+ write_dataset(group_id, DATASET_PRESSURE, data.ntuples, ncomponents,
+ data.type_id, data.pressure, trans_id, estack_id);
+ write_dataset(group_id, DATASET_TEMPERATURE, data.ntuples, ncomponents,
+ data.type_id, data.temperature, trans_id, estack_id);
+
+ ret = H5Gclose_ff(group_id, estack_id);
+ assert(0 == ret);
+
+ /* Finish transaction 0. */
+ ret = H5TRfinish(trans_id, H5P_DEFAULT, NULL, estack_id);
+ assert(0 == ret);
+
+ /* release container version 0. */
+ if (my_rank == 0) {
+ ret = H5RCrelease(rcxt_id, estack_id);
+ assert(0 == ret);
+ }
+
+ ret = H5RCclose(rcxt_id);
+ assert(0 == ret);
+
+ ret = H5TRclose(trans_id);
+ assert(0 == ret);
+}
+
+static void
+ship_analysis(const char *file_name)
+{
+ double lb = 39.1, ub = 42.1;
+ hid_t query_id1, query_id2, query_id3, query_id4;
+ hid_t query_id;
+ herr_t ret;
+
+ /* Create a simple query */
+ /* query = (39.1 < x < 42.1) AND (name = "Temperature") */
+ query_id1 = H5Qcreate(H5Q_TYPE_DATA_ELEM, H5Q_MATCH_GREATER_THAN,
+ H5T_NATIVE_DOUBLE, &lb);
+ assert(query_id1);
+
+ query_id2 = H5Qcreate(H5Q_TYPE_DATA_ELEM, H5Q_MATCH_LESS_THAN,
+ H5T_NATIVE_DOUBLE, &ub);
+ assert(query_id2);
+
+ query_id3 = H5Qcombine(query_id1, H5Q_COMBINE_AND, query_id2);
+ assert(query_id3);
+
+ query_id4 = H5Qcreate(H5Q_TYPE_LINK_NAME, H5Q_MATCH_EQUAL,
+ DATASET_TEMPERATURE);
+ assert(query_id4);
+
+ query_id = H5Qcombine(query_id3, H5Q_COMBINE_AND, query_id4);
+ assert(query_id);
+
+ /* Issue an anlysis shipping request */
+ ret = H5ASinvoke(file_name, query_id, split_script, combine_script,
+ integrate_script, H5_EVENT_STACK_NULL);
+ assert(0 == ret);
+
+ H5Qclose(query_id);
+ H5Qclose(query_id4);
+ H5Qclose(query_id3);
+ H5Qclose(query_id2);
+ H5Qclose(query_id1);
+}
+
+int
+main(int argc, char **argv)
+{
+ const char *user_name;
+ char file_name[NAME_SIZE];
+ hid_t file_id, fapl_id;
+ hid_t estack_id = H5_EVENT_STACK_NULL;
+ herr_t ret;
+ H5ES_status_t status;
+ size_t num_events = 0;
+ struct analysis_data data;
+ size_t ntuples = NTUPLES;
+
+ /* Prepend user name */
+ user_name = getenv("USER");
+ snprintf(file_name, NAME_SIZE, "%s_%s", user_name, FILENAME_APP);
+
+ MPI_Init(&argc, &argv);
+
+ /* Call EFF_init to initialize the EFF stack. */
+ EFF_init(MPI_COMM_WORLD, MPI_INFO_NULL);
+
+ MPI_Comm_rank(MPI_COMM_WORLD, &my_rank);
+ MPI_Comm_size(MPI_COMM_WORLD, &my_size);
+ fprintf(stderr, "APP processes = %d, my rank is %d\n", my_size, my_rank);
+
+ MPI_Barrier(MPI_COMM_WORLD);
+
+ /* Choose the IOD VOL plugin to use with this file. */
+ fapl_id = H5Pcreate(H5P_FILE_ACCESS);
+ H5Pset_fapl_iod(fapl_id, MPI_COMM_WORLD, MPI_INFO_NULL);
+
+ /* create an event Queue for managing asynchronous requests. */
+ estack_id = H5EScreate();
+ assert(estack_id);
+
+ /* Open an existing file. */
+ file_id = H5Fcreate_ff(file_name, H5F_ACC_TRUNC, H5P_DEFAULT, fapl_id,
+ H5_EVENT_STACK_NULL);
+
+ ret = H5Pclose(fapl_id);
+ assert(0 == ret);
+
+ init_data(ntuples, &data);
+
+ write_data(file_id, data, estack_id);
+
+ H5ESget_count(estack_id, &num_events);
+ H5ESwait_all(estack_id, &status);
+ H5ESclear(estack_id);
+ printf("%zu events in event stack. Completion status = %d\n", num_events, status);
+ assert(status == H5ES_STATUS_SUCCEED);
+
+ MPI_Barrier(MPI_COMM_WORLD);
+
+ free_data(&data);
+
+ if(0 == my_rank) {
+ ship_analysis(file_name);
+ }
+
+ MPI_Barrier(MPI_COMM_WORLD);
+
+ ret = H5ESclose(estack_id);
+ assert(ret == 0);
+
+ /* Close the file. */
+ ret = H5Fclose_ff(file_id, 1, H5_EVENT_STACK_NULL);
+ assert(0 == ret);
+
+ EFF_finalize();
+ MPI_Finalize();
+
+ return 0;
+}