diff options
Diffstat (limited to 'examples')
-rw-r--r-- | examples/h5ff_client_index.c | 230 |
1 files changed, 173 insertions, 57 deletions
diff --git a/examples/h5ff_client_index.c b/examples/h5ff_client_index.c index 2a8c746..0e110bf 100644 --- a/examples/h5ff_client_index.c +++ b/examples/h5ff_client_index.c @@ -10,22 +10,27 @@ #include <assert.h> #include <string.h> +/* define filename for this app, and max size after username prepended */ +#define FILENAME_APP "eff_index.h5" +#define NAME_SIZE 64 + #define NTUPLES 256 static int my_rank = 0, my_size = 1; -static void +static herr_t write_dataset(hid_t file_id, const char *dataset_name, hsize_t total, hsize_t ncomponents, hid_t datatype_id, - hsize_t ntuples, hsize_t start, void *buf, hid_t estack_id) + hsize_t ntuples, hsize_t start, void *buf, uint64_t req_version, + hid_t estack_id) { hid_t dataset_id; hid_t file_space_id, mem_space_id; - hid_t trans_id, rcxt_id, trspl_id; + hid_t trans_id, rcxt_id = -1, trspl_id; hsize_t dims[2] = {total, ncomponents}; /* hsize_t offset[2] = {start, 0}; */ hsize_t count[2] = {ntuples, ncomponents}; int rank = (ncomponents == 1) ? 1 : 2; - uint64_t version = 1; + uint64_t version = req_version; herr_t ret; (void) start; @@ -33,10 +38,12 @@ write_dataset(hid_t file_id, const char *dataset_name, /* acquire container version 1 - EXACT. */ if(0 == my_rank) { rcxt_id = H5RCacquire(file_id, &version, H5P_DEFAULT, H5_EVENT_STACK_NULL); - } else { + } + MPI_Bcast(&version, 1, MPI_UINT64_T, 0, MPI_COMM_WORLD); + if (0 != my_rank) { rcxt_id = H5RCcreate(file_id, version); } - assert(1 == version); + assert(req_version == version); /* create transaction object */ trans_id = H5TRcreate(file_id, rcxt_id, version + 1); @@ -93,24 +100,28 @@ write_dataset(hid_t file_id, const char *dataset_name, ret = H5TRclose(trans_id); assert(0 == ret); + + return ret; } -static void +static herr_t create_index(hid_t file_id, const char *dataset_name, unsigned plugin_id, - hid_t estack_id) + uint64_t req_version, hid_t estack_id) { - hid_t dataset_id, trans_id, rcxt_id; + hid_t dataset_id, trans_id, rcxt_id = -1; hid_t trspl_id; - uint64_t version = 2; + uint64_t version = req_version; herr_t ret; /* acquire container version 1 - EXACT. */ if(0 == my_rank) { rcxt_id = H5RCacquire(file_id, &version, H5P_DEFAULT, H5_EVENT_STACK_NULL); - } else { + } + MPI_Bcast(&version, 1, MPI_UINT64_T, 0, MPI_COMM_WORLD); + if (0 != my_rank) { rcxt_id = H5RCcreate(file_id, version); } - assert(2 == version); + assert(req_version == version); /* create transaction object */ trans_id = H5TRcreate(file_id, rcxt_id, version + 1); @@ -151,24 +162,100 @@ create_index(hid_t file_id, const char *dataset_name, unsigned plugin_id, ret = H5TRclose(trans_id); assert(0 == ret); + + return ret; } -static void -query_and_view(hid_t file_id, const char *dataset_name, hid_t estack_id) +static herr_t +write_incr(hid_t file_id, const char *dataset_name, + hsize_t total, hsize_t ncomponents, + hsize_t ntuples, hsize_t start, void *buf, uint64_t req_version, + hid_t estack_id) +{ + hid_t dataset_id; + hid_t file_space_id; +// hsize_t dims[2] = {total, ncomponents}; + hsize_t offset[2] = {start, 0}; + hsize_t count[2] = {ntuples, ncomponents}; + int rank = (ncomponents == 1) ? 1 : 2; + herr_t ret; + uint64_t version = req_version; + int n; + + (void) total; + + /* do incremental updates */ + for (n = 0; n < my_size; n++) { + if (my_rank == n) { + hid_t tid, rid, mem_space_id; + + version = (uint64_t) (req_version + (uint64_t) n); + rid = H5RCacquire(file_id, &version, H5P_DEFAULT, estack_id); + assert((uint64_t )(req_version + (uint64_t) n) == version); + + dataset_id = H5Dopen_ff(file_id, dataset_name, H5P_DEFAULT, rid, estack_id); + assert(dataset_id); + + file_space_id = H5Dget_space(dataset_id); + assert(file_space_id); + + /* create transaction object */ + tid = H5TRcreate(file_id, rid, (uint64_t) (req_version + 1 + (uint64_t) n)); + assert(tid); + ret = H5TRstart(tid, H5P_DEFAULT, estack_id); + assert(0 == ret); + + fprintf(stderr, "Rank %d Doing its Updates with TR %lu RC %lu\n", + my_rank, req_version + 1 + (uint64_t) n, + req_version + (uint64_t) n); + + mem_space_id = H5Screate_simple(rank, count, NULL); + assert(mem_space_id); + /* write data to datasets */ + ret = H5Sselect_hyperslab(file_space_id, H5S_SELECT_SET, offset, + NULL, count, NULL); + assert(0 == ret); + ret = H5Dwrite_ff(dataset_id, H5T_NATIVE_FLOAT, mem_space_id, + file_space_id, H5P_DEFAULT, buf, tid, estack_id); + assert(0 == ret); + ret = H5Sclose(mem_space_id); + assert(0 == ret); + + ret = H5TRfinish(tid, H5P_DEFAULT, NULL, estack_id); + assert(0 == ret); + H5TRclose(tid); + ret = H5RCrelease(rid, H5_EVENT_STACK_NULL); + assert(0 == ret); + ret = H5RCclose(rid); + + /* Close the first dataset. */ + H5Sclose(file_space_id); + ret = H5Dclose_ff(dataset_id, estack_id); + assert(0 == ret); + } + MPI_Barrier(MPI_COMM_WORLD); + } + + return ret; +} + +static herr_t +query_and_view(hid_t file_id, const char *dataset_name, uint64_t req_version, + hid_t estack_id) { float query_lb, query_ub; hid_t query_id1, query_id2; hid_t query_id; hid_t dataset_id; - hid_t rcxt_id; - uint64_t version = 3; + hid_t rcxt_id = -1; + uint64_t version = req_version; herr_t ret; double t1, t2; /* Create a simple query */ /* query = (39.1 < x < 42.1) || (295 < x < 298) */ query_lb = (my_rank == 0) ? 38.8f : 295.f; - query_ub = (my_rank == 0) ? 42.1f : 298.f; + query_ub = (my_rank == 0) ? 42.8f : 298.f; query_id1 = H5Qcreate(H5Q_TYPE_DATA_ELEM, H5Q_MATCH_GREATER_THAN, H5T_NATIVE_FLOAT, &query_lb); @@ -181,36 +268,34 @@ query_and_view(hid_t file_id, const char *dataset_name, hid_t estack_id) query_id = H5Qcombine(query_id1, H5Q_COMBINE_AND, query_id2); assert(query_id); - /* acquire container version 2 - EXACT. */ - version = 3; - rcxt_id = H5RCacquire(file_id, &version, H5P_DEFAULT, estack_id); - assert(rcxt_id > 0); - assert(3 == version); - - MPI_Barrier(MPI_COMM_WORLD); - - dataset_id = H5Dopen_ff(file_id, dataset_name, H5P_DEFAULT, rcxt_id, - estack_id); - -// view_id = H5Vcreate_ff(dataset_id, query_id, H5P_DEFAULT, rid2, -// estack_id); -// assert(view_id > 0); + if(0 == my_rank) { + rcxt_id = H5RCacquire(file_id, &version, H5P_DEFAULT, H5_EVENT_STACK_NULL); + } + MPI_Bcast(&version, 1, MPI_UINT64_T, 0, MPI_COMM_WORLD); + if (0 != my_rank) { + rcxt_id = H5RCcreate(file_id, version); + } + assert(req_version == version); - t1 = MPI_Wtime(); - H5Dquery_ff(dataset_id, query_id, -1, rcxt_id); - t2 = MPI_Wtime(); + if (0 == my_rank) { + dataset_id = H5Dopen_ff(file_id, dataset_name, H5P_DEFAULT, rcxt_id, + estack_id); - printf("Query time: %lf ms\n", (t2 - t1) * 1000); - /* TODO use view_id for analysis shipping etc */ + t1 = MPI_Wtime(); + H5Dquery_ff(dataset_id, query_id, -1, rcxt_id); + t2 = MPI_Wtime(); -// H5Vclose(view_id); + printf("Query time: %lf ms\n", (t2 - t1) * 1000); - ret = H5Dclose_ff(dataset_id, estack_id); - assert(0 == ret); + ret = H5Dclose_ff(dataset_id, estack_id); + assert(0 == ret); + } /* release container version 2. */ - ret = H5RCrelease(rcxt_id, estack_id); - assert(0 == ret); + if (my_rank == 0) { + ret = H5RCrelease(rcxt_id, estack_id); + assert(0 == ret); + } ret = H5RCclose(rcxt_id); assert(0 == ret); @@ -218,22 +303,27 @@ query_and_view(hid_t file_id, const char *dataset_name, hid_t estack_id) H5Qclose(query_id); H5Qclose(query_id2); H5Qclose(query_id1); + + return ret; } int main(int argc, char **argv) { unsigned plugin_id; - char file_name[50]; - char dataset_name[64]; + char file_name[NAME_SIZE]; + char dataset_name[NAME_SIZE]; hsize_t ntuples = NTUPLES; + hsize_t ntuples_multiplier = 1; hsize_t ncomponents = 3; hsize_t start, total; float *data; hid_t file_id, fapl_id; hid_t estack_id = H5_EVENT_STACK_NULL; herr_t ret; + uint64_t req_version; hsize_t i, j; + int incr_update; sprintf(file_name, "%s_%s", getenv("USER"), "eff_file_index.h5"); @@ -241,11 +331,14 @@ main(int argc, char **argv) MPI_Comm_rank(MPI_COMM_WORLD, &my_rank); MPI_Comm_size(MPI_COMM_WORLD, &my_size); - if (argc < 2) { - if (my_rank ==0) printf("Usage: %s <plugin_id>\n", argv[0]); + if (argc < 3) { + if (my_rank ==0) printf("Usage: %s <ntuples_multiplier> <plugin_id> <do incremental update>\n", argv[0]); exit(0); } - plugin_id = atoi(argv[1]); + ntuples_multiplier = (hsize_t) atoi(argv[1]); + ntuples *= ntuples_multiplier; + plugin_id = (unsigned) atoi(argv[2]); + incr_update = atoi(argv[3]); /* Call EFF_init to initialize the EFF stack. */ EFF_init(MPI_COMM_WORLD, MPI_INFO_NULL); @@ -256,16 +349,14 @@ main(int argc, char **argv) memset(dataset_name, '\0', 64); sprintf(dataset_name, "D%d", my_rank); - /* We write to separate datasets */ - start = 0; - total = ntuples; - /* Initialize the dataset. */ - data = (float *) malloc(sizeof(float) * ncomponents * ntuples); - - for (i = 0; i < ntuples; i++) { + data = (float *) malloc(sizeof(float) * ncomponents * ntuples * (hsize_t) my_size); + for (i = 0; i < ntuples * (hsize_t) my_size; i++) { for (j = 0; j < ncomponents; j++) { - data[ncomponents * i + j] = (float) (((hsize_t) my_rank) * ntuples + i); + if (my_rank != 0) + data[ncomponents * i + j] = 41.0f; + else + data[ncomponents * i + j] = (float) i; } } @@ -282,19 +373,41 @@ main(int argc, char **argv) ret = H5Pclose(fapl_id); assert(0 == ret); + /* We write to separate datasets */ + start = 0; + total = ntuples * (hsize_t) my_size; + + req_version = 1; write_dataset(file_id, dataset_name, total, ncomponents, H5T_NATIVE_FLOAT, - ntuples, start, data, estack_id); + ntuples * (hsize_t) my_size, start, data, req_version, estack_id); MPI_Barrier(MPI_COMM_WORLD); - create_index(file_id, dataset_name, plugin_id, estack_id); + req_version++; + create_index(file_id, dataset_name, plugin_id, req_version, estack_id); MPI_Barrier(MPI_COMM_WORLD); - query_and_view(file_id, dataset_name, estack_id); + req_version++; + query_and_view(file_id, dataset_name, req_version, estack_id); MPI_Barrier(MPI_COMM_WORLD); + if (incr_update) { + start = ntuples * (hsize_t) my_rank; + total = ntuples * (hsize_t) my_size; + + write_incr(file_id, "D0", total, ncomponents, ntuples, start, data, + req_version, estack_id); + + MPI_Barrier(MPI_COMM_WORLD); + + req_version += (uint64_t) my_size; + query_and_view(file_id, "D0", req_version, estack_id); + + MPI_Barrier(MPI_COMM_WORLD); + } + /* Close the file. */ ret = H5Fclose_ff(file_id, 1, estack_id); assert(0 == ret); @@ -307,5 +420,8 @@ main(int argc, char **argv) MPI_Finalize(); + if (ret < 0) { + + } return 0; } |