diff options
author | Richard Warren <Richard.Warren@hdfgroup.org> | 2018-10-26 13:06:21 (GMT) |
---|---|---|
committer | Richard Warren <Richard.Warren@hdfgroup.org> | 2018-10-26 13:06:21 (GMT) |
commit | d8d404e6ef1dfaaa061b20084e044c522d1fba81 (patch) | |
tree | 3f055fc65484acb5af3c3165b65c778fc3789811 /src | |
parent | 341c6bcb1ee3de55ac8e4929246e810955533936 (diff) | |
download | hdf5-d8d404e6ef1dfaaa061b20084e044c522d1fba81.zip hdf5-d8d404e6ef1dfaaa061b20084e044c522d1fba81.tar.gz hdf5-d8d404e6ef1dfaaa061b20084e044c522d1fba81.tar.bz2 |
Create a parallel indexing branch
Diffstat (limited to 'src')
-rw-r--r-- | src/H5Dint.c | 35 | ||||
-rw-r--r-- | src/H5Dio.c | 2 | ||||
-rw-r--r-- | src/H5Dscatgath.c | 1 | ||||
-rw-r--r-- | src/H5Q.c | 203 | ||||
-rw-r--r-- | src/H5Spkg.h | 13 | ||||
-rw-r--r-- | src/H5X.c | 429 | ||||
-rw-r--r-- | src/H5Xalacrity.c | 17 | ||||
-rw-r--r-- | src/H5Xdummy.c | 15 | ||||
-rw-r--r-- | src/H5Xfastbit.c | 791 | ||||
-rw-r--r-- | src/H5Xmeta_dummy.c | 1 | ||||
-rw-r--r-- | src/H5Xprivate.h | 5 | ||||
-rw-r--r-- | src/H5Xpublic.h | 19 | ||||
-rw-r--r-- | src/Makefile.am | 2 |
13 files changed, 1484 insertions, 49 deletions
diff --git a/src/H5Dint.c b/src/H5Dint.c index 6596360..c85a5ba 100644 --- a/src/H5Dint.c +++ b/src/H5Dint.c @@ -3411,6 +3411,7 @@ static H5S_t * H5D__query(H5D_t *dset, const H5S_t *file_space, const H5Q_t *query, hid_t xapl_id, hid_t xxpl_id) { + H5X_class_t *idx_class = NULL; H5Q_combine_op_t combine_op; H5S_t *(*query_func)(H5D_t *dset, const H5S_t *file_space, const H5Q_t *query, hid_t xapl_id, hid_t xxpl_id); H5S_t *ret_value = NULL; @@ -3426,10 +3427,35 @@ H5D__query(H5D_t *dset, const H5S_t *file_space, const H5Q_t *query, hid_t xapl_ /* TODO might want to fix that to give the combined query to the plugin, * add something to check whether the plugin supports it or not */ - query_func = (combine_op == H5Q_SINGLETON) ? H5D__query_singleton : H5D__query_combined; - if (NULL == (ret_value = query_func(dset, file_space, query, xapl_id, xxpl_id))) - HGOTO_ERROR(H5E_QUERY, H5E_CANTGET, NULL, "unable to get combine operator"); + /* RAW: Can we call the plugin directly at this point? */ + idx_class = dset->shared->idx_class; + if (idx_class && query->is_combined && + (idx_class->query_types & H5X_COMPOUND_QUERY)) { + hid_t file_space_id = H5S_ALL; + hid_t space_id = FAIL; + /* Index associated to dataset so use it */ + if (NULL == idx_class->idx_class->data_class.query) + HGOTO_ERROR(H5E_INDEX, H5E_BADVALUE, NULL, "plugin query callback not defined"); + /* Open index if not opened yet */ + if (!dset->shared->idx_handle && (FAIL == H5D_open_index(dset, xapl_id))) + HGOTO_ERROR(H5E_INDEX, H5E_CANTOPENOBJ, NULL, "cannot open index"); + + /* Create ID for file_space */ + if (file_space && (FAIL == (file_space_id = H5I_register(H5I_DATASPACE, file_space, FALSE)))) + HGOTO_ERROR(H5E_ATOM, H5E_CANTREGISTER, NULL, "unable to register dataspace"); + /* Call plugin query */ + if (FAIL == (space_id = idx_class->idx_class->data_class.query(dset->shared->idx_handle, file_space_id, query->query_id, xxpl_id))) + HGOTO_ERROR(H5E_INDEX, H5E_CANTSELECT, NULL, "cannot query index"); + + if (NULL == (ret_value = (H5S_t *) H5I_object_verify(space_id, H5I_DATASPACE))) + HGOTO_ERROR(H5E_ARGS, H5E_BADTYPE, NULL, "not a dataspace"); + } + else { + query_func = (combine_op == H5Q_SINGLETON) ? H5D__query_singleton : H5D__query_combined; + if (NULL == (ret_value = query_func(dset, file_space, query, xapl_id, xxpl_id))) + HGOTO_ERROR(H5E_QUERY, H5E_CANTGET, NULL, "unable to get combine operator"); + } done: FUNC_LEAVE_NOAPI(ret_value) } /* end H5D__query */ @@ -3557,6 +3583,9 @@ H5D__query_force(H5D_t *dset, const H5S_t *file_space, const H5Q_t *query) iter_op.op_type = H5S_SEL_ITER_OP_LIB; iter_op.u.lib_op = H5D__query_force_iterate; + /* Jerome suggests change here to avoid iterations */ + + if (FAIL == H5S_select_iterate(buf, dset->shared->type, space, &iter_op, &iter_args)) HGOTO_ERROR(H5E_DATASPACE, H5E_CANTCOMPARE, NULL, "unable to compare attribute elements"); diff --git a/src/H5Dio.c b/src/H5Dio.c index b7818b1..ef2fe63 100644 --- a/src/H5Dio.c +++ b/src/H5Dio.c @@ -285,7 +285,7 @@ H5Dwrite(hid_t dset_id, hid_t mem_type_id, hid_t mem_space_id, hid_t xxpl_id = H5P_INDEX_XFER_DEFAULT; if (idx_class->idx_class->data_class.post_update && - (FAIL == idx_class->idx_class->data_class.post_update(idx_handle, buf, file_space_id, xxpl_id))) + (FAIL == idx_class->idx_class->data_class.post_update(idx_handle, buf, file_space_id, xxpl_id))) HGOTO_ERROR(H5E_INDEX, H5E_CANTUPDATE, FAIL, "cannot do an index post-update"); /* Calling post_update rebuilds the index and index metadata may need diff --git a/src/H5Dscatgath.c b/src/H5Dscatgath.c index 9902b7b..dc0320b 100644 --- a/src/H5Dscatgath.c +++ b/src/H5Dscatgath.c @@ -333,7 +333,6 @@ H5D__scatter_mem (const void *_tscat_buf, const H5S_t *space, for(curr_seq = 0; curr_seq < nseq; curr_seq++) { /* Get the number of bytes in sequence */ curr_len = len[curr_seq]; - HDmemcpy(buf + off[curr_seq], tscat_buf, curr_len); /* Advance offset in destination buffer */ @@ -347,7 +347,6 @@ herr_t H5Q_init(void) { herr_t ret_value = SUCCEED; /* Return value */ - FUNC_ENTER_NOAPI(FAIL) /* FUNC_ENTER() does all the work */ @@ -374,6 +373,11 @@ H5Q__init_package(void) FUNC_ENTER_PACKAGE +#ifdef H5_HAVE_PARALLEL + if (FAIL == H5Xinitialize_parallel_query()) + printf("Unable to initialize parallel queries\n"); +#endif + /* Initialize the atom group for the QUERY IDs */ if (FAIL == H5I_register_type(H5I_QUERY_CLS)) HGOTO_ERROR(H5E_QUERY, H5E_CANTINIT, FAIL, "unable to initialize interface"); @@ -1596,6 +1600,7 @@ done: static herr_t H5Q__apply_data_elem(const H5Q_t *query, hbool_t *result, const H5T_t *type, const void *value) { + static int call_count = 0; herr_t ret_value = SUCCEED; /* Return value */ void *value_buf = NULL, *query_value_buf = NULL; H5T_t *query_type, *promoted_type; @@ -1612,6 +1617,8 @@ H5Q__apply_data_elem(const H5Q_t *query, hbool_t *result, const H5T_t *type, con HDassert((query->query.select.type == H5Q_TYPE_DATA_ELEM) || (query->query.select.type == H5Q_TYPE_ATTR_VALUE)); + call_count++; + // printf("H5Q__apply_data_elem: %d\n", call_count); /* Keep a copy of elem to work on */ if (0 == (type_size = H5T_get_size(type))) HGOTO_ERROR(H5E_ARGS, H5E_BADTYPE, FAIL, "not a valid size"); @@ -2178,6 +2185,191 @@ done: FUNC_LEAVE_NOAPI(ret_value) } /* H5Q__apply_index() */ + +static char * +whitespace(int level) +{ + static char spaces[80] = {0,}; + static int last = 0; + char *next; + int i; + if (last == level) return spaces; + next = spaces; + last = level; + for(i=0; i<level; i++) { + strcpy(next," "); + next += 2; + } + return spaces; +} + +static void +show_combine_op(H5Q_combine_op_t op, int level) +{ + switch(op) { + case H5Q_COMBINE_AND: + printf("%s H5Q_COMBINE_AND\n", whitespace(level)); + break; + case H5Q_COMBINE_OR: + printf("%s H5Q_COMBINE_OR\n", whitespace(level)); + break; + case H5Q_SINGLETON: + printf("%s H5Q_SINGLETON\n", whitespace(level)); + break; + } +} + +static void +show_cmp_value(H5T_class_t d_class, size_t len, void *value, int level) +{ + switch(d_class) { + case H5T_INTEGER: + if (len == 2) { + short *s_value = (short *)value; + printf("%s %d\n", whitespace(level), s_value[0]); + } + else { + if (len == 4) { + int *i_value = (int *)value; + printf("%s %d\n", whitespace(level), i_value[0]); + } + else { + if (len == 8) { + int64_t *l_value = (int64_t *)value; + printf("%s %lld\n", whitespace(level), l_value[0]); + } + } + } + break; + case H5T_FLOAT: + if (len == 4) { + float *f_value = (float *)value; + printf("%s %f\n", whitespace(level), f_value[0]); + } + else { + if (len == 8) { + double *d_value = (double *)value; + printf("%s %llf\n", whitespace(level), d_value[0]); + } + } + break; + case H5T_STRING: + printf("%s %s\n", whitespace(level), (char *)value); + break; + case H5T_TIME: + // break; + case H5T_BITFIELD: + // break; + case H5T_OPAQUE: + // break; + case H5T_COMPOUND: + // break; + case H5T_REFERENCE: + // break; + case H5T_ENUM: + // break; + case H5T_VLEN: + // break; + case H5T_ARRAY: + // break; + default: + printf("unsupported datatype\n"); + break; + } +} + +show_match(H5Q_match_op_t op, int level) +{ + switch(op) { + case H5Q_MATCH_EQUAL: + printf("%s H5Q_MATCH_EQUAL\n",whitespace(level)); + break; + case H5Q_MATCH_NOT_EQUAL: + printf("%s H5Q_MATCH_NOT_EQUAL\n",whitespace(level)); + break; + case H5Q_MATCH_LESS_THAN: + printf("%s H5Q_MATCH_LESS_THAN\n",whitespace(level)); + break; + case H5Q_MATCH_GREATER_THAN: + printf("%s H5Q_MATCH_GREATER_THAN\n",whitespace(level)); + break; + } +} + +static void +show_match_select(H5Q_select_t *op_selection, int level) +{ + H5T_t *d_type = op_selection->elem.data_elem.type; + H5T_class_t data_class = H5T_get_class(d_type, FALSE); + size_t data_size = op_selection->elem.data_elem.type_size; + void *value = op_selection->elem.data_elem.value; + show_match( op_selection->match_op, level ); + show_cmp_value(data_class, data_size, value, level); +} + +static void +show_query(H5Q_t *query, int indent) +{ + H5Q_type_t query_type; + H5Q_combine_op_t op_type; + H5T_t *d_type; + H5T_class_t data_class; + + int level = indent+1; + + if (FAIL == H5Q_get_type(query, &query_type)) { + printf("H5Q_get_type returned error!\n"); + } + else { + switch(query_type) { + case H5Q_TYPE_DATA_ELEM: + printf("%s H5Q_TYPE_DATA_ELEM\n",whitespace(level)); + if (query->is_combined) { + show_combine_op(query->query.combine.op, level); + show_query(query->query.combine.l_query, level); + show_query(query->query.combine.r_query, level); + } else { + show_match_select(&query->query.select, level); + } + break; + case H5Q_TYPE_ATTR_VALUE: + printf("%s H5Q_TYPE_ATTR_VALUE\n",whitespace(level)); + d_type = query->query.select.elem.data_elem.type; + data_class = H5T_get_class(d_type, FALSE); + show_match(query->query.select.match_op, level); + show_cmp_value(data_class, query->query.select.elem.data_elem.type_size, query->query.select.elem.data_elem.value, level); + break; + case H5Q_TYPE_ATTR_NAME: + printf("%s H5Q_TYPE_ATTR_NAME\n",whitespace(level)); + show_match(query->query.select.match_op, level); + printf("%s attr_name = %s\n", whitespace(level), query->query.select.elem.attr_name.name); + break; + case H5Q_TYPE_LINK_NAME: + printf("%s H5Q_TYPE_LINK_NAME\n", whitespace(level)); + show_match(query->query.select.match_op, level); + printf("%s link_name = %s\n", whitespace(level), query->query.select.elem.link_name.name); + break; + case H5Q_TYPE_MISC: + printf("%s H5Q_TYPE_MISC\n",whitespace(level)); + if (query->is_combined) { + show_combine_op(query->query.combine.op, level); + show_query(query->query.combine.l_query, level); + show_query(query->query.combine.r_query, level); + } + break; + } + } +} + + +static int visualize_query = 0; + +void +H5Q_enable_visualize_query(void) +{ + visualize_query++; +} + /*------------------------------------------------------------------------- * Function: H5Q__apply_iterate * @@ -2190,10 +2382,12 @@ done: static herr_t H5Q__apply_iterate(hid_t oid, const char *name, const H5O_info_t *oinfo, void *op_data) { + static int count = 0; H5Q_apply_arg_t *args = (H5Q_apply_arg_t *) op_data; H5Q_type_t query_type; herr_t ret_value = SUCCEED; /* Return value */ + FUNC_ENTER_NOAPI_NOINIT HDassert(args->query); @@ -2211,7 +2405,12 @@ H5Q__apply_iterate(hid_t oid, const char *name, const H5O_info_t *oinfo, void *o H5Q_apply_arg_t args1, args2; H5Q_view_t view1 = H5Q_VIEW_INITIALIZER(view1), view2 = H5Q_VIEW_INITIALIZER(view2); unsigned result1 = 0, result2 = 0; - + if (visualize_query > 0) { + show_query(args->query->query.combine.l_query, 0); + show_query(args->query->query.combine.r_query, 0); + puts("--------------\n"); + visualize_query = 0; + } if (FAIL == H5Q_get_combine_op(args->query, &op_type)) HGOTO_ERROR(H5E_QUERY, H5E_CANTGET, FAIL, "unable to get combine op"); diff --git a/src/H5Spkg.h b/src/H5Spkg.h index fbcb4d1..247030a 100644 --- a/src/H5Spkg.h +++ b/src/H5Spkg.h @@ -91,6 +91,18 @@ typedef struct { H5S_pnt_node_t *tail; /* Pointer to tail of point list */ } H5S_pnt_list_t; +/* */ +struct H5S_index_group_t { + hsize_t n_elems; /* Number of index elements in the group */ + hsize_t *group; /* Pointer to a collection of indices */ + struct H5S_index_group_t *next; /* pointer to next group in list */ +}; + +typedef struct { + struct H5S_index_group_t *head; /* Pointer to head of point list */ + struct H5S_index_group_t *tail; /* Pointer to tail of point list */ +} H5S_indexgrp_list_t; + /* Information about new-style hyperslab spans */ /* Information a particular hyperslab span */ @@ -219,6 +231,7 @@ typedef struct { union { H5S_pnt_list_t *pnt_lst; /* List of selected points (order is important) */ H5S_hyper_sel_t *hslab; /* Info about hyperslab selections */ + H5S_indexgrp_list_t *index_grp; /* List of local index groups */ } sel_info; } H5S_select_t; @@ -459,7 +459,7 @@ H5Xcreate(hid_t loc_id, unsigned plugin_id, hid_t xcpl_id) herr_t ret_value = SUCCEED; /* Return value */ FUNC_ENTER_API(FAIL) - H5TRACE3("e", "iIui", scope_id, plugin_id, xcpl_id); + H5TRACE3("e", "iIui", loc_id, plugin_id, xcpl_id); /* Check args */ if (plugin_id > H5X_PLUGIN_MAX) @@ -631,7 +631,7 @@ H5Xremove(hid_t loc_id, unsigned plugin_id) herr_t ret_value = SUCCEED; /* Return value */ FUNC_ENTER_API(FAIL) - H5TRACE2("e", "iIu", scope_id, plugin_id); + H5TRACE2("e", "iIu", loc_id, plugin_id); /* Check args */ if (plugin_id > H5X_PLUGIN_MAX) @@ -716,7 +716,7 @@ H5Xget_count(hid_t loc_id, hsize_t *idx_count) herr_t ret_value = SUCCEED; /* Return value */ FUNC_ENTER_API(FAIL) - H5TRACE2("e", "i*h", scope_id, idx_count); + H5TRACE2("e", "i*h", loc_id, idx_count); if (FAIL == loc_id) HGOTO_ERROR(H5E_ARGS, H5E_BADTYPE, FAIL, "not a location") @@ -801,7 +801,7 @@ H5Xget_size(hid_t loc_id) hsize_t ret_value = 0; /* Return value */ FUNC_ENTER_API(0) - H5TRACE1("h", "i", scope_id); + H5TRACE1("h", "i", loc_id); if (FAIL == loc_id) HGOTO_ERROR(H5E_ARGS, H5E_BADTYPE, 0, "not a location") @@ -867,3 +867,424 @@ H5X_get_size(hid_t loc_id, hsize_t *idx_size) done: FUNC_LEAVE_NOAPI(ret_value) } /* end H5X_get_size() */ + +#ifdef H5_HAVE_PARALLEL +/*------------------------------------------------------------------------- + * Function: H5Xinitialize_parallel_query + * (and support functions) + * + * Purpose: The intent of this collection of functions is two-fold: + * 1) provide a mechanism to define parallel access to the + * requested HDF file and datasets. + * 2) provide the functions to define MPI groups which contain + * all processes allocated to the same 'node'. + * On a machine such as 'cori' at LBNL, the MAX number of + * processes on a single node will be 64 if no additional + * action is taken. + * + * Once an MPI group is defined, all non-rank-0 processes + * will send their query results to the group-rank-0 process + * for writing to a file. + * + *------------------------------------------------------------------------- + */ +#define HR_RANK(R) ((int)hr_view[(R)].rank & 0x3FFFFFFF) +#define HR_ID(R) (hr_view[(R)].hostid) + +typedef struct { + int hostid; + int rank; +} hr_tuple; + +static int enable_parallel_queries = 0; +static int g_mpi_rank = -1; +static int g_mpi_size = -1; + +static int g_local_peer_count = -1; +static int g_local_peer_rank = -1; + +static int g_group_peer_count = -1; +static int g_group_peer_rank = -1; +static int g_group_id = -1; + +static int *layout=0; +static int *_ranks=0; +static int *_nodes=0; +static int *_peercounts=0; + +static MPI_Comm query_group_comm = MPI_COMM_NULL; + +static int +cmphostid(const void *h1, const void *h2) +{ + return (*(int *)h1 > *(int *)h2); +} + +static int +_get_nodecount(void) +{ + static int nodecount = 0; + if (nodecount) return nodecount; + + if (_ranks && _nodes && _peercounts && layout) { + int i, basei=0; + hr_tuple *hr_view = (hr_tuple *)layout; + int checkid = HR_ID(0); + _nodes[0] = 0; + + for(i=0; i < g_mpi_size; i++) { + _ranks[i] = HR_RANK(i); + if (HR_ID(i) != checkid) { + checkid = HR_ID(i); + _peercounts[nodecount] = i - basei; + basei = i; + _nodes[nodecount++] = i; + } + } + + // We're done looking at every node, so + // now we just fill in the peercount for the last node. + if (nodecount) { + _peercounts[nodecount] = g_mpi_size - basei; + } + else _peercounts[nodecount] = g_mpi_size; + } + return ++nodecount; +} + +static int +_get_local_peer_count(int hostid) +{ + int peercount = 0; + if (g_mpi_size > 1) { + int i, nodecount = _get_nodecount(); + hr_tuple *hr_view = (hr_tuple *)layout; + for(i=0; i < nodecount; i++) { + int sindex = _nodes[i]; + if (HR_ID(sindex) == hostid) { + g_group_id = i; + return peercount = _peercounts[i]; + } + } + } + return peercount = g_mpi_size; +} + +static int +_get_local_peer_rank(int hostid) +{ + if (g_mpi_size > 1) { + int i, nodecount = _get_nodecount(); + hr_tuple *hr_view = (hr_tuple *)layout; + for(i=0; i < nodecount; i++) { + int k, sindex = _nodes[i]; + if (HR_ID(sindex) == hostid) { + for(k=0; i<_peercounts[i]; k++) { + if (HR_RANK(sindex+k) == g_mpi_rank) + return k; + } + } + } + } + return g_mpi_rank; +} + +static +int getLocalInfo(int hostid) +{ + int rank = g_mpi_rank; + int size = g_mpi_size; + int localinfo[2] = {hostid, rank}; + if (size > 1) { + /* Max number of nodes is the same as the number of ranks */ + _nodes = (int *)calloc(size,sizeof(int)); + _ranks = (int *)calloc(size,sizeof(int)); + _peercounts = (int *)calloc(size,sizeof(int)); + layout = (int *)calloc(size*2,sizeof(int)); + + /* Exchange host/rank info */ + if( MPI_Allgather(localinfo,2, MPI_INT,layout,2,MPI_INT,MPI_COMM_WORLD) != MPI_SUCCESS) { + printf("ERROR! MPI_Allgather failure\n"); + return -1; + } + /* Possibly sort to get the info about local peers */ + if (size > 2) qsort(layout,size,sizeof(int)*2,cmphostid); + g_group_peer_count = _get_nodecount(); + g_local_peer_count = _get_local_peer_count(hostid); + g_local_peer_rank = _get_local_peer_rank(hostid); + } + return 1; +} + +static +void gather_topology_info(void) +{ + int isInitialized = 0; + int hostid = gethostid(); + MPI_Initialized(&isInitialized); + if (!isInitialized) { + enable_parallel_queries = 0; + return; + } + MPI_Comm_rank(MPI_COMM_WORLD, &g_mpi_rank); + MPI_Comm_size(MPI_COMM_WORLD, &g_mpi_size); + if (g_mpi_size == 1) { + g_group_peer_count = 1; + g_local_peer_count = 1; + g_local_peer_rank = 0; + enable_parallel_queries = 0; + return; + } + + g_local_peer_count = getLocalInfo(hostid); + enable_parallel_queries = 1; +} + +static +herr_t create_group_comms(void) +{ + herr_t ret_value = SUCCEED; /* Return value */ + int query_rank = -1; + int query_size = -1; + + /* The enable flag is set via the gather_topolgy_info() function above. + * NOTE: We won't initialize MPI, nor enable parallel queries if the user + * hasn't already initialized MPI themselves... + */ + if (!enable_parallel_queries) + return SUCCEED; + + if (g_group_peer_count == 1) { + if (MPI_Comm_dup(MPI_COMM_WORLD,&query_group_comm) != MPI_SUCCESS) + return FAIL; + return SUCCEED; + } + + if (g_group_id < 0) { + printf("ERROR: group ids have not been set!\n"); + return FAIL; + } + if (MPI_Comm_split(MPI_COMM_WORLD, g_group_id, g_group_peer_rank, &query_group_comm) != MPI_SUCCESS) { + printf("ERROR! MPI_Comm_split failed\n"); + return FAIL; + } + if (MPI_Comm_rank(query_group_comm,&query_rank) != MPI_SUCCESS) { + printf("ERROR: Query comm is non-functional!\n"); + return FAIL; + } + if (MPI_Comm_size(query_group_comm,&query_size) != MPI_SUCCESS) { + printf("ERROR: Query comm is non-functional!\n"); + return FAIL; + } + return ret_value; +} + + +/*------------------------------------------------------------------------- + * Function: H5Xinitialize_parallel_query + * + * Return: Non-negative on success/Negative on failure + *------------------------------------------------------------------------- + */ +herr_t +H5Xinitialize_parallel_query(void) +{ + static int initialized = 0; + gather_topology_info(); + return create_group_comms(); +} + +int +H5Xparallel_queries_enabled(void) +{ + return enable_parallel_queries; +} + +int +H5Xparallel_rank(void) +{ + return g_mpi_rank; +} + +int +H5Xparallel_size(void) +{ + return g_mpi_size; +} + +herr_t +H5Xallgather_by_size(void *alldata, int nelems, int typesize) +{ + /* Exchange */ + int i, size; + if (typesize == 8) { + int64_t *longdata = (int64_t *)alldata; + int64_t *src = &longdata[g_mpi_rank * nelems]; + /* Copy nelems (allgather doesn't like aliasing between input and output) */ + int64_t *my_longdata = (int64_t *)malloc(nelems * sizeof(int64_t)); + for(i=0; i< nelems; i++) my_longdata[i] = src[i]; + MPI_Allgather(my_longdata, nelems, MPI_LONG, longdata, nelems, MPI_LONG, MPI_COMM_WORLD); + free(my_longdata); + } + else if (typesize == 4) { + int *intdata = (int *)alldata; + int *src = &intdata[g_mpi_rank * nelems]; + int *my_intdata = (int *)malloc(nelems * sizeof(int)); + for(i=0; i< nelems; i++) my_intdata[i] = src[i]; + MPI_Allgather(my_intdata, nelems, MPI_INT, intdata, nelems, MPI_INT, MPI_COMM_WORLD); + free(my_intdata); + } + else if (typesize == 2) { + short *shortdata = (short *)alldata; + int *src = &shortdata[g_mpi_rank * nelems]; + short *my_shortdata = (short *)malloc(nelems * sizeof(short)); + for(i=0; i< nelems; i++) my_shortdata[i] = src[i]; + MPI_Allgather(my_shortdata, 1, MPI_SHORT, shortdata, 1, MPI_SHORT, MPI_COMM_WORLD); + free(my_shortdata); + } + else if (typesize == 1) { + char *bytedata = (char *)alldata; + char *src = &bytedata[g_mpi_rank * nelems]; + char *my_bytedata = (short *)malloc(nelems * sizeof(char)); + for(i=0; i< nelems; i++) my_bytedata[i] = src[i]; + MPI_Allgather(my_bytedata, 1, MPI_BYTE, bytedata, 1, MPI_BYTE, MPI_COMM_WORLD); + free(my_bytedata); + } + else return FAIL; /* All non-supported lengths will fail */ + return SUCCEED; +} + +/* + * Helper function for H5Xslab_set + */ +#define BYROW 1 /* divide into slabs of rows */ +#define BYCOL 2 /* divide into blocks of columns */ +#define BYLASTDIM 3 /* For 3D and higher, we get contiguous blocks */ +#define ZROW 4 /* same as BYCOL except process 0 gets 0 rows */ +#define ZCOL 5 /* same as BYCOL except process 0 gets 0 columns */ + +static void +slab_set(int mpi_rank, int mpi_size, int ndims, hsize_t dims[], hsize_t start[], hsize_t count[], + hsize_t stride[], hsize_t block[], int mode) +{ + int i, lastdim = ndims-1; + switch (mode){ + case BYROW: + /* Each process takes a slabs of rows. */ + block[0] = dims[0]/mpi_size; + start[0] = mpi_rank*block[0]; + for(i=1; i < ndims; i++) { + block[i] = dims[i]; + stride[i] = block[i]; + count[i] = 1; + start[i] = 0; + } + break; + case BYCOL: + /* Each process takes a block of columns. */ + for(i=0; i < ndims; i++) { + if (i == 1) { + block[1] = dims[1]/mpi_size; + start[1] = mpi_rank * block[1]; + } + else { + block[i] = dims[i]; + start[i] = 0; + } + stride[i] = block[i]; + count[i] = 1; + } + break; + case BYLASTDIM: + for(i=0; i < lastdim; i++) { + block[i] = dims[i]; + stride[i] = block[i]; + count[i] = 1; + start[i] = 0; + } + block[lastdim] = dims[lastdim]/mpi_size; + stride[lastdim] = block[lastdim]; + count[lastdim] = 1; + start[lastdim] = mpi_rank*block[lastdim]; + break; + case ZROW: + /* Similar to BYROW except process 0 gets 0 row */ + /* Each process takes a slabs of rows. */ + block[0] = (mpi_rank ? dims[0]/mpi_size : 0); + start[0] = (mpi_rank ? mpi_rank*block[0] : 1); + for(i=1; i < ndims; i++) { + block[i] = dims[i]; + stride[i] = block[i]; + count[i] = 1; + start[i] = 0; + } + break; + case ZCOL: + /* Similar to BYCOL except process 0 gets 0 column */ + /* Each process takes a block of columns. */ + for(i=0; i < ndims; i++) { + if (i == 1) { + block[1] = (mpi_rank ? dims[1]/mpi_size : 0); + start[1] = (mpi_rank ? mpi_rank * block[1] : 1); + } + else { + block[i] = dims[i]; + start[i] = 0; + } + stride[i] = block[i]; + count[i] = 1; + } + break; + default: + /* Unknown mode. Set it to cover the whole dataset. */ + printf("unknown slab_set mode (%d)\n", mode); + for(i=0; i < ndims; i++) { + block[i] = dims[i]; + stride[i] = block[i]; + count[i] = 1; + start[i] = 0; + } + break; + } +} + +/*------------------------------------------------------------------------- + * Function: H5Xslab_set + * + * Purpose: Prepare to call H5Sselect_hyperslab(). + * Returns: The number of dimensions of each container, i.e. all + * output arguments have 'ndims' number of initialized + * values. + *------------------------------------------------------------------------- + */ +int +H5Xslab_set(hid_t filespace_id, hsize_t **start, hsize_t **count, hsize_t **stride, hsize_t **block) +{ + int ds_ndims; + hsize_t *temp, *dims; + + HDassert(start); + HDassert(count); + HDassert(count); + HDassert(block); + + ds_ndims = H5Sget_simple_extent_ndims(filespace_id); + if (ds_ndims > 0) { + dims = (hsize_t *)calloc(ds_ndims, sizeof(hsize_t)); + temp = (hsize_t *)calloc(ds_ndims * 4, sizeof(hsize_t)); + if (dims && temp) { + *start = temp; + *count = *start + ds_ndims; + *stride = *count + ds_ndims; + *block = *stride + ds_ndims; + + H5Sget_simple_extent_dims(filespace_id, dims, NULL); + slab_set(g_mpi_rank, g_mpi_size, ds_ndims, dims, *start, *count, *stride, *block, BYLASTDIM); + free(dims); + } + else return -1; + } + return ds_ndims; +} + +#endif /* H5_HAVE_PARALLEL */ diff --git a/src/H5Xalacrity.c b/src/H5Xalacrity.c index 5e4101e..338ed1a 100644 --- a/src/H5Xalacrity.c +++ b/src/H5Xalacrity.c @@ -207,12 +207,7 @@ extern _Bool findBinRange1C(const ALMetadata *meta, ALUnivariateQuery *query, /* Local Variables */ /*******************/ -/* Alacrity index class */ -const H5X_class_t H5X_ALACRITY[1] = {{ - H5X_CLASS_T_VERS, /* (From the H5Xpublic.h header file) */ - H5X_PLUGIN_ALACRITY, /* (Or whatever number is assigned) */ - "ALACRITY index plugin", /* Whatever name desired */ - H5X_TYPE_DATA_ELEM, /* This plugin operates on dataset elements */ +static H5X_idx_class_t idx_class = {.data_class = { H5X_alacrity_create, /* create */ H5X_alacrity_remove, /* remove */ H5X_alacrity_open, /* open */ @@ -225,6 +220,16 @@ const H5X_class_t H5X_ALACRITY[1] = {{ NULL /* get_size */ }}; +/* Alacrity index class */ +const H5X_class_t H5X_ALACRITY[1] = {{ + H5X_CLASS_T_VERS, /* (From the H5Xpublic.h header file) */ + H5X_PLUGIN_ALACRITY, /* (Or whatever number is assigned) */ + "ALACRITY index plugin", /* Whatever name desired */ + H5X_TYPE_DATA_ELEM, /* This plugin operates on dataset elements */ + FALSE, /* plugin doesn't support multiple condition queries */ + &idx_class /* Index class */ +}}; + /*------------------------------------------------------------------------- * Function: H5X__alacrity_init * diff --git a/src/H5Xdummy.c b/src/H5Xdummy.c index cd902e0..1a69dbf 100644 --- a/src/H5Xdummy.c +++ b/src/H5Xdummy.c @@ -133,6 +133,7 @@ const H5X_class_t H5X_DUMMY[1] = {{ H5X_PLUGIN_DUMMY, /* (Or whatever number is assigned) */ "dummy index plugin", /* Whatever name desired */ H5X_TYPE_DATA, /* This plugin operates on dataset elements */ + H5X_SIMPLE_QUERY|H5X_COMPOUND_QUERY, /* plugin does support multiple condition (compound) queries */ &idx_class /* Index class */ }}; @@ -202,6 +203,7 @@ static void * H5X_dummy_create(hid_t dataset_id, hid_t H5_ATTR_UNUSED xcpl_id, hid_t H5_ATTR_UNUSED xapl_id, size_t *metadata_size, void **metadata) { + static int entered_count=0; H5X_dummy_t *dummy = NULL; hid_t file_id = FAIL, type_id, space_id; void *buf = NULL; @@ -223,6 +225,8 @@ H5X_dummy_create(hid_t dataset_id, hid_t H5_ATTR_UNUSED xcpl_id, hid_t H5_ATTR_U if (FAIL == (space_id = H5Dget_space(dataset_id))) HGOTO_ERROR(H5E_INDEX, H5E_CANTGET, NULL, "can't get dataspace from dataset"); + entered_count++; + // printf("H5X_dummy_create: entered=%d\tdataset=%lld\n", entered_count, dataset_id); /* Get data from dataset */ if (FAIL == H5X__dummy_read_data(dataset_id, &buf, &buf_size)) HGOTO_ERROR(H5E_INDEX, H5E_CANTGET, NULL, "can't get data from dataset"); @@ -317,6 +321,7 @@ static void * H5X_dummy_open(hid_t dataset_id, hid_t H5_ATTR_UNUSED xapl_id, size_t metadata_size, void *metadata) { + static int entered_count=0; hid_t file_id; H5X_dummy_t *dummy = NULL; void *ret_value = NULL; /* Return value */ @@ -333,6 +338,7 @@ H5X_dummy_open(hid_t dataset_id, hid_t H5_ATTR_UNUSED xapl_id, size_t metadata_s if (NULL == (dummy = (H5X_dummy_t *) H5MM_malloc(sizeof(H5X_dummy_t)))) HGOTO_ERROR(H5E_INDEX, H5E_NOSPACE, NULL, "can't allocate dummy struct"); + entered_count++; dummy->dataset_id = dataset_id; dummy->idx_token_size = metadata_size; if (NULL == (dummy->idx_token = H5MM_malloc(dummy->idx_token_size))) @@ -346,7 +352,7 @@ H5X_dummy_open(hid_t dataset_id, hid_t H5_ATTR_UNUSED xapl_id, size_t metadata_s HGOTO_ERROR(H5E_INDEX, H5E_CANTOPENOBJ, NULL, "can't open anonymous dataset"); ret_value = dummy; - + // printf("H5X_dummy_open: entered=%d\tdataset=%lld\n", entered_count, dataset_id); done: FUNC_LEAVE_NOAPI(ret_value) } /* end H5X_dummy_open() */ @@ -549,6 +555,13 @@ H5X_dummy_query(void *idx_handle, hid_t H5_ATTR_UNUSED dataspace_id, hid_t query /* iterate over every element and apply the query on it. If the query is not satisfied, then remove it from the query selection */ + /* Fix-ME! don't iterate */ +#if 0 + if (H5Xgather_local_indices(buf, type_id, space_id, H5X__dummy_get_query_data_cb, &udata) >= 0) { + printf("H5Xgather_local_indices returned success\n"); + } + else +#endif if (H5Diterate(buf, type_id, space_id, H5X__dummy_get_query_data_cb, &udata) < 0) HGOTO_ERROR(H5E_INDEX, H5E_CANTCOMPUTE, FAIL, "failed to compute buffer size"); diff --git a/src/H5Xfastbit.c b/src/H5Xfastbit.c index 0d37b06..59c4b88 100644 --- a/src/H5Xfastbit.c +++ b/src/H5Xfastbit.c @@ -24,7 +24,7 @@ /* Headers */ /***********/ #include "H5private.h" /* Generic Functions */ -#include "H5Xprivate.h" /* Index */ +#include "H5Xprivate.h" /* Index */ #include "H5Eprivate.h" /* Error handling */ #include "H5Iprivate.h" /* IDs */ #include "H5MMprivate.h" /* Memory management */ @@ -34,6 +34,12 @@ #include "H5Sprivate.h" /* TODO using private headers but could use public ones */ +/* TODO move these includes to someplace where all plugins get it (by default) */ +#ifdef H5_HAVE_PARALLEL +#include "H5FDmpi.h" +#include "H5FDmpio.h" +#endif + #ifdef H5_HAVE_FASTBIT /** * Header file that defines an in-memory C API for accessing the querying @@ -45,10 +51,11 @@ /****************/ /* Local Macros */ /****************/ -//#define H5X_FASTBIT_DEBUG +// #define H5X_FASTBIT_DEBUG #ifdef H5X_FASTBIT_DEBUG -#define H5X_FASTBIT_DEBUG_LVL 6 +// #define H5X_FASTBIT_DEBUG_LVL 6 +#define H5X_FASTBIT_DEBUG_LVL 2 #define H5X_FASTBIT_LOG_DEBUG(...) do { \ fprintf(stdout, " # %s(): ", __func__); \ fprintf(stdout, __VA_ARGS__); \ @@ -63,6 +70,12 @@ #define H5X_FASTBIT_MAX_NAME 1024 +struct fastbit_meta_collection { + int64_t nkeys; + int64_t noffsets; + int64_t nbitmaps; +}; + /******************/ /* Local Typedefs */ /******************/ @@ -71,6 +84,7 @@ typedef struct H5X_fastbit_t { hid_t file_id; /* ID of the indexed dataset file */ hid_t dataset_id; /* ID of the indexed dataset */ + unsigned dataset_ndims; /* dataset number of dimensions */ hsize_t *dataset_dims; /* dataset dimensions */ hsize_t *dataset_down_dims; /* dataset downed dimensions */ @@ -91,6 +105,12 @@ typedef struct H5X_fastbit_t { char column_name[H5X_FASTBIT_MAX_NAME]; + /* For parallel query/indexing support */ + int nranks; + hid_t filespace_id; + hid_t memspace_id; + hid_t index_info_group_id; + hbool_t idx_reconstructed; } H5X_fastbit_t; @@ -99,11 +119,23 @@ struct H5X_fastbit_scatter_info { size_t src_buf_size; /* Remaining number of elements to return */ }; +typedef struct H5X_fastbit_list { + hid_t dataset_id; + H5X_fastbit_t *fastbit_info; + struct H5X_fastbit_list *next; +} H5X_fastbit_list_t; + + +static H5X_fastbit_list_t *H5X_fastbit_current_datasets = NULL; + /********************/ /* Local Prototypes */ /********************/ static H5X_fastbit_t * +H5X__get_fastbit(hid_t dataset_id); + +static H5X_fastbit_t * H5X__fastbit_init(hid_t dataset_id); static herr_t @@ -123,6 +155,13 @@ static herr_t H5X__fastbit_build_index(H5X_fastbit_t *fastbit); static herr_t +H5X__fastbit_define_dataset(H5X_fastbit_t *fastbit, int typeIndex, struct fastbit_meta_collection *gatherInfo, + hsize_t *array_size, hsize_t *offset, hid_t *space_id, hid_t *memspace_id); + +static herr_t +H5X__fastbit_build_parallel_index(H5X_fastbit_t *fastbit); + +static herr_t H5X__fastbit_serialize_metadata(H5X_fastbit_t *fastbit, void *buf, size_t *buf_size); @@ -130,6 +169,9 @@ static herr_t H5X__fastbit_deserialize_metadata(H5X_fastbit_t *fastbit, void *buf); static herr_t +H5X__fastbit_reconstruct_parallel_index(H5X_fastbit_t *fastbit); + +static herr_t H5X__fastbit_reconstruct_index(H5X_fastbit_t *fastbit); static double @@ -206,10 +248,80 @@ const H5X_class_t H5X_FASTBIT[1] = {{ H5X_PLUGIN_FASTBIT, /* (Or whatever number is assigned) */ "FASTBIT index plugin", /* Whatever name desired */ H5X_TYPE_DATA, /* This plugin operates on dataset elements */ + H5X_SIMPLE_QUERY, /* plugin does NOT support multiple condition (compound) queries */ &idx_class /* Index class */ }}; /*------------------------------------------------------------------------- + * Function: H5X__fastbit_add_to_list + * + * Purpose: Creates and then adds a H5X_fastbit_list_t entry + * to a list of known datasets. + *------------------------------------------------------------------------- + */ + +static H5X_fastbit_list_t * +H5X__fastbit_add_to_list(H5X_fastbit_t *fastbit_info) +{ + H5X_fastbit_list_t *ret_value = NULL; + H5X_fastbit_list_t *new_entry = NULL; + + FUNC_ENTER_NOAPI_NOINIT + HDassert(fastbit_info); + + if (NULL == (new_entry = (H5X_fastbit_list_t *)H5MM_malloc(sizeof(H5X_fastbit_list_t)))) + HGOTO_ERROR(H5E_INDEX, H5E_NOSPACE, NULL, "can't allocate FastBit list struct"); + + new_entry->dataset_id = fastbit_info->dataset_id; + new_entry->fastbit_info = fastbit_info; + new_entry->next = H5X_fastbit_current_datasets; + H5X_fastbit_current_datasets = new_entry; + ret_value = new_entry; + + done: + FUNC_LEAVE_NOAPI(ret_value) +} + +/*------------------------------------------------------------------------- + * Function: H5X__fastbit_remove_from_list + * + * Purpose: Creates and then adds a H5X_fastbit_list_t entry + * to a list of known datasets. + *------------------------------------------------------------------------- + */ +static void H5X__fastbit_remove_from_list(hid_t dataset_id) +{ + H5X_fastbit_list_t *thisEntry = H5X_fastbit_current_datasets; + H5X_fastbit_list_t *previous = NULL; + while (thisEntry && thisEntry->dataset_id != dataset_id) { + previous = thisEntry; + thisEntry = thisEntry->next; + } + if (thisEntry == H5X_fastbit_current_datasets) + H5X_fastbit_current_datasets = thisEntry->next; + if (previous) previous->next = thisEntry->next; + if (thisEntry) H5MM_free(thisEntry); +} + +/*------------------------------------------------------------------------- + * Function: H5X__get_fastbit + * + * Purpose: Searchs the list of known datasets and returns a + * H5X_fastbit_t pointer if it contains the requested dataset_id. + *------------------------------------------------------------------------- + */ +static H5X_fastbit_t * +H5X__get_fastbit(hid_t dataset_id) +{ + H5X_fastbit_list_t *nextEntry = H5X_fastbit_current_datasets; + while(nextEntry) { + if (nextEntry->dataset_id == dataset_id) return nextEntry->fastbit_info; + nextEntry = nextEntry->next; + } + return NULL; +} + +/*------------------------------------------------------------------------- * Function: H5X__fastbit_init * * Purpose: Configure and set up and the FastBit encoder. @@ -257,6 +369,11 @@ H5X__fastbit_init(hid_t dataset_id) H5X_FASTBIT_LOG_DEBUG("Dataset Name : %s", dataset_name); sprintf(fastbit->column_name, "array%u", H5X__fastbit_hash_string(dataset_name)); + fastbit->nranks = 0; + fastbit->filespace_id = FAIL; + fastbit->memspace_id = FAIL; + fastbit->index_info_group_id = FAIL; + fastbit->idx_reconstructed = FALSE; /* Initialize FastBit (no config file for now) */ @@ -288,6 +405,10 @@ H5X__fastbit_init(hid_t dataset_id) ret_value = fastbit; +#ifdef H5_HAVE_PARALLEL + H5X__fastbit_add_to_list(fastbit); +#endif + done: if (space_id != FAIL) H5Sclose(space_id); @@ -309,7 +430,6 @@ H5X__fastbit_term(H5X_fastbit_t *fastbit) herr_t ret_value = SUCCEED; /* Return value */ FUNC_ENTER_NOAPI_NOINIT - if (!fastbit) HGOTO_DONE(SUCCEED); @@ -333,6 +453,9 @@ H5X__fastbit_term(H5X_fastbit_t *fastbit) HGOTO_ERROR(H5E_INDEX, H5E_CANTCLOSEOBJ, FAIL, "can't close opaque type"); /* Close anonymous datasets */ + if ((FAIL != fastbit->index_info_group_id) && + (FAIL == H5Dclose(fastbit->index_info_group_id))) + HGOTO_ERROR(H5E_INDEX, H5E_CANTCLOSEOBJ, FAIL, "can't close anonymous dataset for index"); if ((FAIL != fastbit->keys_id) && (FAIL == H5Dclose(fastbit->keys_id))) HGOTO_ERROR(H5E_INDEX, H5E_CANTCLOSEOBJ, FAIL, "can't close anonymous dataset for index"); @@ -349,6 +472,10 @@ H5X__fastbit_term(H5X_fastbit_t *fastbit) /* Free FastBit resources */ fastbit_cleanup(); +#ifdef H5_HAVE_PARALLEL + H5X__fastbit_remove_from_list(fastbit->dataset_id); +#endif + H5MM_free(fastbit); done: @@ -443,6 +570,9 @@ H5X__fastbit_read_data(hid_t dataset_id, const char *column_name, void **buf, { herr_t ret_value = SUCCEED; /* Return value */ hid_t type_id = FAIL, space_id = FAIL; + hid_t mem_space = H5S_ALL; + hid_t file_space = H5P_DEFAULT;; + size_t nelmts, elmt_size; void *data = NULL; size_t data_size; @@ -450,23 +580,68 @@ H5X__fastbit_read_data(hid_t dataset_id, const char *column_name, void **buf, FUNC_ENTER_NOAPI_NOINIT - /* Get space info */ if (FAIL == (type_id = H5Dget_type(dataset_id))) HGOTO_ERROR(H5E_INDEX, H5E_CANTGET, FAIL, "can't get type from dataset"); + if (0 == (elmt_size = H5Tget_size(type_id))) + HGOTO_ERROR(H5E_DATATYPE, H5E_BADTYPE, FAIL, "invalid size of element"); if (FAIL == (space_id = H5Dget_space(dataset_id))) HGOTO_ERROR(H5E_INDEX, H5E_CANTGET, FAIL, "can't get dataspace from dataset"); + +#ifdef H5_HAVE_PARALLEL + if (H5Xparallel_queries_enabled() > 0) { + int i, ds_ndims = 0; + hsize_t *start = NULL, *count = NULL, *block = NULL, *stride = NULL; + H5X_fastbit_t *fastbit = H5X__get_fastbit(dataset_id); + ds_ndims = H5Xslab_set(fastbit->filespace_id, &start, &count, &stride, &block); + file_space = fastbit->filespace_id; + + if (H5Sselect_hyperslab(fastbit->filespace_id, H5S_SELECT_SET, start, stride, count, block) < 0) + HGOTO_ERROR(H5E_DATASPACE, H5E_BADVALUE, FAIL, "H5Sselect_hyperslab returned an error"); + + if ((mem_space = H5Screate_simple (ds_ndims, block, NULL)) < 0) + HGOTO_ERROR(H5E_DATASPACE, H5E_BADVALUE, FAIL, "H5Screate_simple returned an error"); + + fastbit->memspace_id = mem_space; + nelmts = block[0]; + for (i=1; i< ds_ndims; i++) + nelmts *= block[i]; + + data_size = nelmts * elmt_size; + // printf("data_size: %llu = %llu x %llu\n", data_size, nelmts, elmt_size); + + if (NULL == (data = H5MM_malloc(data_size))) + HGOTO_ERROR(H5E_INDEX, H5E_NOSPACE, FAIL, "can't allocate read buffer"); + + // printf("H5X__fastbit_read_data: allocated %llu bytes, data=%p\n", data_size, data); + + /* The variable 'start' is the original pointer allocated within H5Xslab_set. + * The additional variables: count, block, and stride are all just offsets + * from this pointer. As a result, we only need to free the start pointer. + */ + free(start); + } + else { + /* Get space info */ + if (0 == (nelmts = (size_t) H5Sget_select_npoints(space_id))) + HGOTO_ERROR(H5E_DATASPACE, H5E_BADVALUE, FAIL, "invalid number of elements"); + + /* Allocate buffer to hold data */ + data_size = nelmts * elmt_size; + if (NULL == (data = H5MM_malloc(data_size))) + HGOTO_ERROR(H5E_INDEX, H5E_NOSPACE, FAIL, "can't allocate read buffer"); + } +#else if (0 == (nelmts = (size_t) H5Sget_select_npoints(space_id))) HGOTO_ERROR(H5E_DATASPACE, H5E_BADVALUE, FAIL, "invalid number of elements"); - if (0 == (elmt_size = H5Tget_size(type_id))) - HGOTO_ERROR(H5E_DATATYPE, H5E_BADTYPE, FAIL, "invalid size of element"); /* Allocate buffer to hold data */ data_size = nelmts * elmt_size; if (NULL == (data = H5MM_malloc(data_size))) HGOTO_ERROR(H5E_INDEX, H5E_NOSPACE, FAIL, "can't allocate read buffer"); +#endif /* Read data from dataset */ - if (FAIL == H5Dread(dataset_id, type_id, H5S_ALL, space_id, H5P_DEFAULT, data)) + if (FAIL == H5Dread(dataset_id, type_id, mem_space, file_space, H5P_DEFAULT, data)) HGOTO_ERROR(H5E_INDEX, H5E_READERROR, FAIL, "can't read data"); /* Convert type to FastBit type */ @@ -482,12 +657,17 @@ H5X__fastbit_read_data(hid_t dataset_id, const char *column_name, void **buf, *buf_size = data_size; done: + /* NOTE: for parallel indexing, we've made a copy of of the mem_space selection + * Should we create an actual copy and delete the version we created here? + */ if (type_id != FAIL) H5Tclose(type_id); if (space_id != FAIL) H5Sclose(space_id); - if (ret_value == FAIL) + if (ret_value == FAIL) { H5MM_free(data); + *buf = NULL; + } FUNC_LEAVE_NOAPI(ret_value) } /* H5X__fastbit_read_data() */ @@ -507,6 +687,11 @@ H5X__fastbit_build_index(H5X_fastbit_t *fastbit) hsize_t key_array_size, offset_array_size, bitmap_array_size; herr_t ret_value = SUCCEED; /* Return value */ +#ifdef H5_HAVE_PARALLEL + if (H5Xparallel_queries_enabled() > 0) + return H5X__fastbit_build_parallel_index(fastbit); +#endif + FUNC_ENTER_NOAPI_NOINIT H5X_FASTBIT_LOG_DEBUG("Calling FastBit build index"); @@ -607,6 +792,319 @@ done: FUNC_LEAVE_NOAPI(ret_value) } /* H5X__fastbit_build_index */ + +#ifdef H5_HAVE_PARALLEL +#define KEYS_ 0 +#define OFFSETS_ 1 +#define BITMAPS_ 2 +#define INFO_ 3 +#define BADINDEX_ 4 + +static herr_t +H5X__fastbit_define_dataset(H5X_fastbit_t *fastbit, int typeIndex, struct fastbit_meta_collection *gatherInfo, + hsize_t *array_size, hsize_t *offset, hid_t *space_id, hid_t *memspace_id) +{ + herr_t ret_value = SUCCEED; /* Return value */ + hid_t _space_id = FAIL, _memspace_id = H5S_ALL;; + int mpi_rank = H5Xparallel_rank(); + hsize_t count; + + FUNC_ENTER_NOAPI_NOINIT + + if ((typeIndex < 0) || (typeIndex >= BADINDEX_)) + HGOTO_ERROR(H5E_INDEX, H5E_CANTGET, FAIL, "Bad data selection index"); + if (FAIL == (_space_id = H5Screate_simple(1, array_size, NULL))) + HGOTO_ERROR(H5E_INDEX, H5E_CANTCREATE, FAIL, "can't create simple (file) dataspace"); + + if (typeIndex == KEYS_) { + count = (hsize_t)gatherInfo[mpi_rank].nkeys; + if (FAIL == (_memspace_id = H5Screate_simple(1, &count, NULL))) + HGOTO_ERROR(H5E_INDEX, H5E_CANTCREATE, FAIL, "can't create simple (mem) dataspace"); + if ((H5Sselect_hyperslab(_space_id, H5S_SELECT_SET, offset, NULL, &count, NULL)) < 0 ) + HGOTO_ERROR(H5E_INDEX, H5E_CANTGET, FAIL, "can't select hyperslab offsets"); + if (FAIL == (fastbit->keys_id = H5Dcreate_anon(fastbit->file_id, fastbit->opaque_type_id, + _space_id, H5P_DEFAULT, H5P_DEFAULT))) + HGOTO_ERROR(H5E_INDEX, H5E_CANTCREATE, FAIL, "can't create anonymous dataset"); + /* Increment refcount so that anonymous dataset is persistent */ + if (FAIL == H5Oincr_refcount(fastbit->keys_id)) + HGOTO_ERROR(H5E_INDEX, H5E_CANTINC, FAIL, "can't increment dataset refcount"); + } + + else if (typeIndex == OFFSETS_) { + count = (hsize_t)gatherInfo[mpi_rank].noffsets; + if (FAIL == (_memspace_id = H5Screate_simple(1, &count, NULL))) + HGOTO_ERROR(H5E_INDEX, H5E_CANTCREATE, FAIL, "can't create simple (mem) dataspace"); + if ((H5Sselect_hyperslab(_space_id, H5S_SELECT_SET, offset, NULL, &count, NULL)) < 0 ) + HGOTO_ERROR(H5E_INDEX, H5E_CANTGET, FAIL, "can't select hyperslab offsets"); + if (FAIL == (fastbit->offsets_id = H5Dcreate_anon(fastbit->file_id, fastbit->opaque_type_id, + _space_id, H5P_DEFAULT, H5P_DEFAULT))) + HGOTO_ERROR(H5E_INDEX, H5E_CANTCREATE, FAIL, "can't create anonymous dataset"); + /* Increment refcount so that anonymous dataset is persistent */ + if (FAIL == H5Oincr_refcount(fastbit->offsets_id)) + HGOTO_ERROR(H5E_INDEX, H5E_CANTINC, FAIL, "can't increment dataset refcount"); + } + + else if (typeIndex == BITMAPS_) { + count = (hsize_t)gatherInfo[mpi_rank].nbitmaps; + if (FAIL == (_memspace_id = H5Screate_simple(1, &count, NULL))) + HGOTO_ERROR(H5E_INDEX, H5E_CANTCREATE, FAIL, "can't create simple (mem) dataspace"); + if ((H5Sselect_hyperslab(_space_id, H5S_SELECT_SET, offset, NULL, &count, NULL)) < 0 ) + HGOTO_ERROR(H5E_INDEX, H5E_CANTGET, FAIL, "can't select hyperslab offsets"); + if (FAIL == (fastbit->bitmaps_id = H5Dcreate_anon(fastbit->file_id, fastbit->opaque_type_id, + _space_id, H5P_DEFAULT, H5P_DEFAULT))) + HGOTO_ERROR(H5E_INDEX, H5E_CANTCREATE, FAIL, "can't create anonymous dataset"); + /* Increment refcount so that anonymous dataset is persistent */ + if (FAIL == H5Oincr_refcount(fastbit->bitmaps_id)) + HGOTO_ERROR(H5E_INDEX, H5E_CANTINC, FAIL, "can't increment dataset refcount"); + } + else if (typeIndex == INFO_) { + if (FAIL == (_memspace_id = H5Screate_simple(1, array_size, NULL))) + HGOTO_ERROR(H5E_INDEX, H5E_CANTCREATE, FAIL, "can't create simple (mem) dataspace"); + if ((H5Sselect_hyperslab(_space_id, H5S_SELECT_SET, offset, NULL, array_size, NULL)) < 0 ) + HGOTO_ERROR(H5E_INDEX, H5E_CANTGET, FAIL, "can't select hyperslab offsets"); + if (FAIL == (fastbit->index_info_group_id = H5Dcreate_anon(fastbit->file_id, fastbit->opaque_type_id, + _space_id, H5P_DEFAULT, H5P_DEFAULT))) + HGOTO_ERROR(H5E_INDEX, H5E_CANTCREATE, FAIL, "can't create anonymous dataset"); + /* Increment refcount so that anonymous dataset is persistent */ + if (FAIL == H5Oincr_refcount(fastbit->index_info_group_id)) + HGOTO_ERROR(H5E_INDEX, H5E_CANTINC, FAIL, "can't increment dataset refcount"); + } + + /* Save the outputs for use in writing the actual dataset! */ + *space_id = _space_id; + *memspace_id = _memspace_id; + + done: + + FUNC_LEAVE_NOAPI(ret_value) +} + +/*------------------------------------------------------------------------- + * Function: H5X__fastbit_build_parallel_index + * + * Purpose: Same as H5X__fastbit_build_index, but parallel. + * CALLED ONLY FROM THE SERIAL CODE + * (when H5Xparallel_queries_enabled returns TRUE) + * This is an attempt to modularize the code and + * to make the mode more readable. + * + * Return: Non-negative on success/Negative on failure + * + *------------------------------------------------------------------------- + */ +static herr_t +H5X__fastbit_build_parallel_index(H5X_fastbit_t *fastbit) +{ + hid_t key_space_id = FAIL, offset_space_id = FAIL, bitmap_space_id = FAIL; + hid_t info_space_id = FAIL; + hid_t key_memspace_id = H5S_ALL; + hid_t offset_memspace_id = H5S_ALL; + hid_t bitmap_memspace_id = H5S_ALL; + hid_t info_memspace_id = H5S_ALL; + hsize_t key_array_size, offset_array_size, bitmap_array_size; + herr_t ret_value = SUCCEED; /* Return value */ + hid_t dxpl_id = H5P_DEFAULT; + int i, mpi_rank, mpi_size; + int64_t nkeys_totalsize = 0, nkeys_offset = 0; + int64_t noffsets_totalsize = 0, noffsets_offset = 0; + int64_t nbitmaps_totalsize = 0, nbitmaps_offset = 0; + int64_t indexinfo_totalsize = 0, indexinfo_offset = 0; + struct fastbit_meta_collection *gatherInfo = NULL; + + FUNC_ENTER_NOAPI_NOINIT + + H5X_FASTBIT_LOG_DEBUG("Calling FastBit build parallel index"); + /* Build our process local index */ + if (0 != fastbit_iapi_build_index(fastbit->column_name, (const char *)0)) + HGOTO_ERROR(H5E_INDEX, H5E_CANTENCODE, FAIL, "FastBit build index failed"); + + /* Get process local arrays from index */ + if (0 != fastbit_iapi_deconstruct_index(fastbit->column_name, &fastbit->keys, &fastbit->nkeys, + &fastbit->offsets, &fastbit->noffsets, &fastbit->bitmaps, &fastbit->nbitmaps)) + HGOTO_ERROR(H5E_INDEX, H5E_CANTGET, FAIL, "Can't get FastBit index arrays"); + + H5X_FASTBIT_LOG_DEBUG("FastBit build parallel index, nkeys=%lu, noffsets=%lu, nbitmaps=%lu\n", + fastbit->nkeys, fastbit->noffsets, fastbit->nbitmaps); + + /* Create key array with opaque type */ + key_array_size = fastbit->nkeys * sizeof(double); + + mpi_size = H5Xparallel_size(); + mpi_rank = H5Xparallel_rank(); + + /* We need storage for 3 elements per rank:: {nkeys, noffsets, n_bitmaps} + * + * +-------------+ + * | (RANK 0) | + * | nkeys | + * | noffsets | + * | nbitmaps | + * +-------------+ + * | | + * ~ ~ + * +-------------+ + * | (RANK N) | + * | nkeys | + * | noffsets | + * | nbitmaps | + * +-------------+ + * + */ + /* Allocate storage for 3 elements per rank */ + indexinfo_totalsize = (3 * sizeof(int64_t) * mpi_size); + if (NULL == (gatherInfo = (struct fastbit_meta_collection *)H5MM_malloc(indexinfo_totalsize))) + HGOTO_ERROR(H5E_INDEX, H5E_CANTALLOC, FAIL, "can't allocate nkeys array"); + + gatherInfo[mpi_rank].nkeys = (int64_t)(fastbit->nkeys * sizeof(double)); + gatherInfo[mpi_rank].noffsets = (int64_t)(fastbit->noffsets * sizeof(int64_t)); + gatherInfo[mpi_rank].nbitmaps = (int64_t)(fastbit->nbitmaps * sizeof(int32_t)); + + /* Exchange info with all MPI ranks */ + /* We could use allreduce here if we only needed the total sizes + * but we also want to figure out the offsets (for the hyperslab selection). + */ + if (H5Xallgather_by_size(gatherInfo, 3, sizeof(int64_t)) != SUCCEED) + HGOTO_ERROR(H5E_INDEX, H5E_CANTGET, FAIL, "can't gather nkeys array"); + + /* Gather the total sizes and offset info */ + for(i=0; i < mpi_size; i++) { + if (i == mpi_rank) { + nkeys_offset = nkeys_totalsize; + noffsets_offset = noffsets_totalsize; + nbitmaps_offset = nbitmaps_totalsize; + } + nkeys_totalsize += gatherInfo[i].nkeys; + noffsets_totalsize += gatherInfo[i].noffsets; + nbitmaps_totalsize += gatherInfo[i].nbitmaps; + } + + /* Save the exchanged info for future use */ + fastbit->nranks = mpi_size; + + /* Create a transfer property to utilize MPI-IO */ + if ((dxpl_id = H5Pcreate(H5P_DATASET_XFER)) < 0 ) + HGOTO_ERROR(H5E_INDEX, H5E_CANTGET, FAIL, "can't define dataset_xfer property"); + if ((H5Pset_dxpl_mpio(dxpl_id, H5FD_MPIO_COLLECTIVE)) < 0 ) + HGOTO_ERROR(H5E_INDEX, H5E_CANTGET, FAIL, "can't set dataset_xfer to MPIO_COLLECTIVE"); + + + /* -------------- */ + /* INFOGROUP */ + /* -------------- */ + + /* We'll write out the entire 'gatherInfo' array into a dataset for later reuse */ + if (fastbit->index_info_group_id != FAIL) { + if (FAIL == H5Odecr_refcount(fastbit->index_info_group_id)) + HGOTO_ERROR(H5E_INDEX, H5E_CANTINC, FAIL, "can't decrement dataset refcount"); + if (FAIL == H5Dclose(fastbit->index_info_group_id)) + HGOTO_ERROR(H5E_INDEX, H5E_CANTCLOSEOBJ, FAIL, "can't close dataset"); + } + + if (FAIL == H5X__fastbit_define_dataset(fastbit, INFO_, gatherInfo, &indexinfo_totalsize, &indexinfo_offset, &info_space_id, &info_memspace_id )) + HGOTO_ERROR(H5E_INDEX, H5E_CANTCREATE, FAIL, "can't create simple (file) dataspace"); + + + /* -------------- */ + /* KEYS */ + /* -------------- */ + if (fastbit->keys_id != FAIL) { + if (FAIL == H5Odecr_refcount(fastbit->keys_id)) + HGOTO_ERROR(H5E_INDEX, H5E_CANTINC, FAIL, "can't decrement dataset refcount"); + if (FAIL == H5Dclose(fastbit->keys_id)) + HGOTO_ERROR(H5E_INDEX, H5E_CANTCLOSEOBJ, FAIL, "can't close dataset"); + } + + key_array_size = nkeys_totalsize; + printf("H5X__fastbit_build_parallel_index:\n\tkey_array_size = %lld, nkeys_offset=%lld\n", key_array_size, nkeys_offset); + if (FAIL == H5X__fastbit_define_dataset(fastbit, KEYS_, gatherInfo, &key_array_size, &nkeys_offset, &key_space_id, &key_memspace_id )) + HGOTO_ERROR(H5E_INDEX, H5E_CANTCREATE, FAIL, "can't create simple (file) dataspace"); + + /* -------------- */ + /* OFFSETS */ + /* -------------- */ + if (fastbit->offsets_id != FAIL) { + if (FAIL == H5Odecr_refcount(fastbit->offsets_id)) + HGOTO_ERROR(H5E_INDEX, H5E_CANTINC, FAIL, "can't decrement dataset refcount"); + if (FAIL == H5Dclose(fastbit->offsets_id)) + HGOTO_ERROR(H5E_INDEX, H5E_CANTCLOSEOBJ, FAIL, "can't close dataset"); + } + + offset_array_size = noffsets_totalsize; + printf("\toffset_array_size = %lld, noffsets_offset=%lld\n", offset_array_size, noffsets_offset); + if (FAIL == H5X__fastbit_define_dataset(fastbit, OFFSETS_, gatherInfo, &offset_array_size, &noffsets_offset, &offset_space_id, &offset_memspace_id )) + HGOTO_ERROR(H5E_INDEX, H5E_CANTCREATE, FAIL, "can't create simple (file) dataspace"); + + /* -------------- */ + /* BITMAPS */ + /* -------------- */ + if (fastbit->bitmaps_id != FAIL) { + if (FAIL == H5Odecr_refcount(fastbit->bitmaps_id)) + HGOTO_ERROR(H5E_INDEX, H5E_CANTINC, FAIL, "can't decrement dataset refcount"); + if (FAIL == H5Dclose(fastbit->bitmaps_id)) + HGOTO_ERROR(H5E_INDEX, H5E_CANTCLOSEOBJ, FAIL, "can't close dataset"); + } + + bitmap_array_size = nbitmaps_totalsize; + printf("\tbitmap_array_size = %lld, nbitmaps_offset=%lld\n", bitmap_array_size, nbitmaps_offset); + if (FAIL == H5X__fastbit_define_dataset(fastbit, BITMAPS_, gatherInfo, &bitmap_array_size, &nbitmaps_offset, &bitmap_space_id, &bitmap_memspace_id )) + HGOTO_ERROR(H5E_INDEX, H5E_CANTCREATE, FAIL, "can't create simple (file) dataspace"); + + /* NO ERRORS (to this point)! */ + /* ===========================*/ + /* Write info about the various sizes-per-rank per dataset (see gatherInfo above) */ + if (FAIL == H5Dwrite(fastbit->index_info_group_id, fastbit->opaque_type_id, info_memspace_id, + info_space_id, dxpl_id, gatherInfo)) + HGOTO_ERROR(H5E_INDEX, H5E_CANTUPDATE, FAIL, "can't write infogroup metadata"); + + /* Write keys */ + if (FAIL == H5Dwrite(fastbit->keys_id, fastbit->opaque_type_id, key_memspace_id, + key_space_id, dxpl_id, fastbit->keys)) + HGOTO_ERROR(H5E_INDEX, H5E_CANTUPDATE, FAIL, "can't write index metadata"); + + /* Write offsets */ + if (FAIL == H5Dwrite(fastbit->offsets_id, fastbit->opaque_type_id, offset_memspace_id, + offset_space_id, dxpl_id, fastbit->offsets)) + HGOTO_ERROR(H5E_INDEX, H5E_CANTUPDATE, FAIL, "can't write index metadata"); + + /* Write bitmaps */ + if (FAIL == H5Dwrite(fastbit->bitmaps_id, fastbit->opaque_type_id, bitmap_memspace_id, + bitmap_space_id, dxpl_id, fastbit->bitmaps)) + HGOTO_ERROR(H5E_INDEX, H5E_CANTUPDATE, FAIL, "can't write index metadata"); + + +done: + if (key_space_id != FAIL) + H5Sclose(key_space_id); + if (offset_space_id != FAIL) + H5Sclose(offset_space_id); + if (bitmap_space_id != FAIL) + H5Sclose(bitmap_space_id); + if (info_space_id != FAIL) + H5Sclose(info_space_id); + if ((key_memspace_id != H5S_ALL) && (key_memspace_id != FAIL)) + H5Sclose(key_memspace_id); + if ((offset_memspace_id != H5S_ALL) && (offset_memspace_id != FAIL)) + H5Sclose(offset_memspace_id); + if ((bitmap_memspace_id != H5S_ALL) && (bitmap_memspace_id != FAIL)) + H5Sclose(bitmap_memspace_id); + if ((info_memspace_id != H5S_ALL) && (info_memspace_id != FAIL)) + H5Sclose(info_memspace_id); + if (dxpl_id != H5P_DEFAULT) + H5Pclose(dxpl_id); + if (err_occurred) { + H5MM_free(gatherInfo); + H5MM_free(fastbit->keys); + fastbit->keys = NULL; + H5MM_free(fastbit->offsets); + fastbit->offsets = NULL; + H5MM_free(fastbit->bitmaps); + fastbit->bitmaps = NULL; + } + FUNC_LEAVE_NOAPI(ret_value) +} /* H5X__fastbit_build_index */ + +#endif /* H5_HAVE_PARALLEL */ + /*------------------------------------------------------------------------- * Function: H5X__fastbit_merge_data * @@ -621,7 +1119,6 @@ H5X__fastbit_scatter_cb(const void **src_buf/*out*/, size_t *src_buf_bytes_used/ void *_info) { struct H5X_fastbit_scatter_info *info = (struct H5X_fastbit_scatter_info *) _info; - /* Set output variables */ *src_buf = info->src_buf; *src_buf_bytes_used = info->src_buf_size; @@ -647,18 +1144,42 @@ H5X__fastbit_merge_data(H5X_fastbit_t *fastbit, const void *data, if (FAIL == (type_id = H5Dget_type(fastbit->dataset_id))) HGOTO_ERROR(H5E_INDEX, H5E_CANTGET, FAIL, "can't get type from dataset"); - if (FAIL == (space_id = H5Dget_space(fastbit->dataset_id))) - HGOTO_ERROR(H5E_INDEX, H5E_CANTGET, FAIL, "can't get dataspace from dataset"); - if (0 == (nelmts_data = (size_t) H5Sget_select_npoints(dataspace_id))) - HGOTO_ERROR(H5E_DATASPACE, H5E_BADVALUE, FAIL, "invalid number of elements"); if (0 == (data_elmt_size = H5Tget_size(type_id))) HGOTO_ERROR(H5E_DATATYPE, H5E_BADTYPE, FAIL, "invalid size of element"); - info.src_buf = data; - info.src_buf_size = nelmts_data * data_elmt_size; +#ifdef H5_HAVE_PARALLEL + /* For parallel operations, the dataspace_id being provided is one which + * the defines the local view of the on-disk file, i.e. a hyperslab with + * MPI rank specific offsets. Here, we've already + * written data to the file and need to merge the local data with what + * what previously there. We're merging these buffers in memory + * so we don't want to deal with the offsets that were needed to write the + * local slice onto disk. + */ + if (H5Xparallel_queries_enabled() > 0) { + if (fastbit->memspace_id != FAIL) + dataspace_id = fastbit->memspace_id; + nelmts_data = (size_t) H5Sget_select_npoints(dataspace_id); + // printf("select_npoints(fastbit->filespace): nelmts = %llu, data_elmt_size = %llu\n", nelmts_data, data_elmt_size); + + info.src_buf = data; + info.src_buf_size = nelmts_data * data_elmt_size; + if (FAIL == H5Dscatter(H5X__fastbit_scatter_cb, &info, type_id, dataspace_id, buf)) + HGOTO_ERROR(H5E_DATASET, H5E_CANTGET, FAIL, "cannot scatter data"); - if (FAIL == H5Dscatter(H5X__fastbit_scatter_cb, &info, type_id, dataspace_id, buf)) - HGOTO_ERROR(H5E_DATASET, H5E_CANTGET, FAIL, "cannot scatter data"); + } + else +#else + { + if (0 == (nelmts_data = (size_t) H5Sget_select_npoints(dataspace_id))) + HGOTO_ERROR(H5E_DATASPACE, H5E_BADVALUE, FAIL, "invalid number of elements"); + + info.src_buf = data; + info.src_buf_size = nelmts_data * data_elmt_size; + if (FAIL == H5Dscatter(H5X__fastbit_scatter_cb, &info, type_id, dataspace_id, buf)) + HGOTO_ERROR(H5E_DATASET, H5E_CANTGET, FAIL, "cannot scatter data"); + } +#endif done: if (type_id != FAIL) @@ -679,34 +1200,50 @@ done: *------------------------------------------------------------------------- */ static herr_t -H5X__fastbit_serialize_metadata(H5X_fastbit_t *fastbit, void *buf, - size_t *buf_size) +H5X__fastbit_serialize_metadata(H5X_fastbit_t *fastbit, void *buf, size_t *buf_size) { + /* The implementation records 3 values (for the anonymous datasets) + * {keys_id, offsets_id, bitmaps_id} + */ + int extra_info = 0; size_t metadata_size = 3 * sizeof(haddr_t); herr_t ret_value = SUCCEED; /* Return value */ FUNC_ENTER_NOAPI_NOINIT +#ifdef H5_HAVE_PARALLEL + if (H5Xparallel_queries_enabled() > 0) { + extra_info = 1; + metadata_size += sizeof(haddr_t); + } +#endif if (buf) { H5O_info_t dset_info; char *buf_ptr = (char *) buf; dset_info.addr = 0; + /* Maybe encode info_group */ + if (extra_info) { + if ((fastbit->index_info_group_id != (hid_t)FAIL) && (FAIL == H5Oget_info(fastbit->index_info_group_id, &dset_info))) + HGOTO_ERROR(H5E_INDEX, H5E_CANTGET, FAIL, "can't get info for anonymous dataset"); + HDmemcpy(buf_ptr, &dset_info.addr, sizeof(haddr_t)); + buf_ptr += sizeof(haddr_t); + } /* Encode keys info */ - if ((fastbit->keys_id != FAIL) && (FAIL == H5Oget_info(fastbit->keys_id, &dset_info))) + if ((fastbit->keys_id != (hid_t)FAIL) && (FAIL == H5Oget_info(fastbit->keys_id, &dset_info))) HGOTO_ERROR(H5E_INDEX, H5E_CANTGET, FAIL, "can't get info for anonymous dataset"); HDmemcpy(buf_ptr, &dset_info.addr, sizeof(haddr_t)); buf_ptr += sizeof(haddr_t); /* Encode offsets info */ - if ((fastbit->offsets_id != FAIL) && (FAIL == H5Oget_info(fastbit->offsets_id, &dset_info))) + if ((fastbit->offsets_id != (hid_t)FAIL) && (FAIL == H5Oget_info(fastbit->offsets_id, &dset_info))) HGOTO_ERROR(H5E_INDEX, H5E_CANTGET, FAIL, "can't get info for anonymous dataset"); HDmemcpy(buf_ptr, &dset_info.addr, sizeof(haddr_t)); buf_ptr += sizeof(haddr_t); /* Encode bitmaps info */ - if ((fastbit->bitmaps_id != FAIL) && (FAIL == H5Oget_info(fastbit->bitmaps_id, &dset_info))) + if ((fastbit->bitmaps_id != (hid_t)FAIL) && (FAIL == H5Oget_info(fastbit->bitmaps_id, &dset_info))) HGOTO_ERROR(H5E_INDEX, H5E_CANTGET, FAIL, "can't get info for anonymous dataset"); HDmemcpy(buf_ptr, &dset_info.addr, sizeof(haddr_t)); buf_ptr += sizeof(haddr_t); @@ -737,6 +1274,15 @@ H5X__fastbit_deserialize_metadata(H5X_fastbit_t *fastbit, void *buf) FUNC_ENTER_NOAPI_NOINIT +#ifdef H5_HAVE_PARALLEL + if (H5Xparallel_queries_enabled() > 0) { + addr = *((haddr_t *) buf_ptr); + if (addr && (FAIL == (fastbit->index_info_group_id = H5Oopen_by_addr(fastbit->file_id, *((haddr_t *) buf_ptr))))) + HGOTO_ERROR(H5E_INDEX, H5E_CANTOPENOBJ, FAIL, "can't open anonymous dataset"); + buf_ptr += sizeof(haddr_t); + } +#endif + /* Decode keys info */ addr = *((haddr_t *) buf_ptr); if (addr && (FAIL == (fastbit->keys_id = H5Oopen_by_addr(fastbit->file_id, addr)))) @@ -753,6 +1299,7 @@ H5X__fastbit_deserialize_metadata(H5X_fastbit_t *fastbit, void *buf) addr = *((haddr_t *) buf_ptr); if (addr && (FAIL == (fastbit->bitmaps_id = H5Oopen_by_addr(fastbit->file_id, *((haddr_t *) buf_ptr))))) HGOTO_ERROR(H5E_INDEX, H5E_CANTOPENOBJ, FAIL, "can't open anonymous dataset"); + buf_ptr += sizeof(haddr_t); done: FUNC_LEAVE_NOAPI(ret_value) @@ -781,6 +1328,189 @@ H5X__fastbit_read_bitmaps(void *context, uint64_t start, uint64_t count, } static herr_t +H5X__fastbit_reconstruct_parallel_index(H5X_fastbit_t *fastbit) +{ + hid_t dxpl_id = H5P_DEFAULT; + hid_t keys_space_id = FAIL, offsets_space_id = FAIL, bitmaps_space_id = FAIL; + hid_t key_memspace_id = H5S_ALL; + hid_t offset_memspace_id = H5S_ALL; + hid_t bitmap_memspace_id = H5S_ALL; + + size_t key_array_size, offset_array_size, bitmap_array_size; + FastBitDataType fastbit_type; + hid_t type_id = FAIL, space_id = FAIL; + size_t nelmts; + + herr_t ret_value = SUCCEED; /* Return value */ + int64_t nkeys_totalsize = 0, nkeys_offset = 0; + int64_t noffsets_totalsize = 0, noffsets_offset = 0; + int64_t nbitmaps_totalsize = 0, nbitmaps_offset = 0; + + struct fastbit_meta_collection *gatherInfo = NULL; + hid_t info_space_id = FAIL; + size_t info_array_size; + int i, nranks; + int mpi_size = H5Xparallel_size(); + int mpi_rank = H5Xparallel_rank(); + + FUNC_ENTER_NOAPI_NOINIT + + /* Read the spaceinfo from the anonymous dataset. + * We will compare the computed total size per private store size + * to ensure that things are ok to proceed. + */ + if (FAIL == (info_space_id = H5Dget_space(fastbit->index_info_group_id))) + HGOTO_ERROR(H5E_INDEX, H5E_CANTGET, FAIL, "can't get dataspace from index"); + if (0 == (info_array_size = (size_t) H5Sget_select_npoints(info_space_id))) + HGOTO_ERROR(H5E_DATASPACE, H5E_BADVALUE, FAIL, "invalid number of elements"); + if (NULL == (gatherInfo = (struct fastbit_meta_collection *) H5MM_malloc(info_array_size))) + HGOTO_ERROR(H5E_INDEX, H5E_NOSPACE, FAIL, "can't allocate gatherInfo"); + if (FAIL == H5Dread(fastbit->index_info_group_id, fastbit->opaque_type_id, + H5S_ALL, info_space_id, H5P_DEFAULT, gatherInfo)) + HGOTO_ERROR(H5E_INDEX, H5E_READERROR, FAIL, "can't read gatherInfo data"); + + /* The gatherInfo array is N groups of 3 int64_t values + * It should be a simple thing to divide the total dataset size by + * the size of a single entry to compute the number of rank entries + * stored. This needs to match the current MPI size. + */ + nranks = info_array_size/ sizeof(struct fastbit_meta_collection); + + /* Make sure that the stored information is a match for the + * current parallel execution. + */ + HDassert(mpi_size == nranks); + for(i=0; i< nranks; i++) { + if (i == mpi_rank) { + nkeys_offset = nkeys_totalsize; + noffsets_offset = noffsets_totalsize; + nbitmaps_offset = nbitmaps_totalsize; + } + nkeys_totalsize += gatherInfo[i].nkeys; + noffsets_totalsize += gatherInfo[i].noffsets; + nbitmaps_totalsize += gatherInfo[i].nbitmaps; + } + /* Prepare to read nkeys */ + if (FAIL == (keys_space_id = H5Dget_space(fastbit->keys_id))) + HGOTO_ERROR(H5E_INDEX, H5E_CANTGET, FAIL, "can't get dataspace from index"); + if (0 == (key_array_size = (size_t) H5Sget_select_npoints(keys_space_id))) + HGOTO_ERROR(H5E_DATASPACE, H5E_BADVALUE, FAIL, "invalid number of elements"); + + HDassert(nkeys_totalsize == key_array_size); + fastbit->nkeys = gatherInfo[mpi_rank].nkeys / sizeof(double); + + /* Prepare to read noffsets */ + if (FAIL == (offsets_space_id = H5Dget_space(fastbit->offsets_id))) + HGOTO_ERROR(H5E_INDEX, H5E_CANTGET, FAIL, "can't get dataspace from index"); + if (0 == (offset_array_size = (size_t) H5Sget_select_npoints(offsets_space_id))) + HGOTO_ERROR(H5E_DATASPACE, H5E_BADVALUE, FAIL, "invalid number of elements"); + + HDassert(noffsets_totalsize == offset_array_size); + fastbit->noffsets = gatherInfo[mpi_rank].noffsets / sizeof(int64_t); + + /* Prepare to read nbitmaps */ + if (FAIL == (bitmaps_space_id = H5Dget_space(fastbit->bitmaps_id))) + HGOTO_ERROR(H5E_INDEX, H5E_CANTGET, FAIL, "can't get dataspace from index"); + if (0 == (bitmap_array_size = (size_t) H5Sget_select_npoints(bitmaps_space_id))) + HGOTO_ERROR(H5E_DATASPACE, H5E_BADVALUE, FAIL, "invalid number of elements"); + + HDassert(nbitmaps_totalsize == bitmap_array_size); + fastbit->nbitmaps = gatherInfo[mpi_rank].nbitmaps / sizeof(uint32_t); + + if (NULL == (fastbit->keys = (double *) H5MM_malloc(gatherInfo[mpi_rank].nkeys))) + HGOTO_ERROR(H5E_INDEX, H5E_NOSPACE, FAIL, "can't allocate keys"); + if (NULL == (fastbit->offsets = (int64_t *) H5MM_malloc(gatherInfo[mpi_rank].noffsets))) + HGOTO_ERROR(H5E_INDEX, H5E_NOSPACE, FAIL, "can't allocate offsets"); + if (NULL == (fastbit->bitmaps = (uint32_t *) H5MM_malloc(gatherInfo[mpi_rank].nbitmaps))) + HGOTO_ERROR(H5E_INDEX, H5E_NOSPACE, FAIL, "can't allocate offsets"); + + /* define the hyperslab selection(s) */ + + key_array_size = nkeys_totalsize; + if (FAIL == H5X__fastbit_define_dataset(fastbit, KEYS_, gatherInfo, &key_array_size, &nkeys_offset, &keys_space_id, &key_memspace_id )) + HGOTO_ERROR(H5E_INDEX, H5E_CANTCREATE, FAIL, "can't create simple (file) dataspace"); + + offset_array_size = noffsets_totalsize; + if (FAIL == H5X__fastbit_define_dataset(fastbit, OFFSETS_, gatherInfo, &offset_array_size, &noffsets_offset, &offsets_space_id, &offset_memspace_id )) + HGOTO_ERROR(H5E_INDEX, H5E_CANTCREATE, FAIL, "can't create simple (file) dataspace"); + + bitmap_array_size = nbitmaps_totalsize; + if (FAIL == H5X__fastbit_define_dataset(fastbit, BITMAPS_, gatherInfo, &bitmap_array_size, &nbitmaps_offset, &bitmaps_space_id, &bitmap_memspace_id )) + HGOTO_ERROR(H5E_INDEX, H5E_CANTCREATE, FAIL, "can't create simple (file) dataspace"); + + /* Create a transfer property to utilize MPI-IO */ + if ((dxpl_id = H5Pcreate(H5P_DATASET_XFER)) < 0 ) + HGOTO_ERROR(H5E_INDEX, H5E_CANTGET, FAIL, "can't define dataset_xfer property"); + if ((H5Pset_dxpl_mpio(dxpl_id, H5FD_MPIO_COLLECTIVE)) < 0 ) + HGOTO_ERROR(H5E_INDEX, H5E_CANTGET, FAIL, "can't set dataset_xfer to MPIO_COLLECTIVE"); + + /* Read the actual metadata (keys) */ + if (FAIL == H5Dread(fastbit->keys_id, fastbit->opaque_type_id, + key_memspace_id, keys_space_id, dxpl_id, fastbit->keys)) + HGOTO_ERROR(H5E_INDEX, H5E_READERROR, FAIL, "can't read data"); + + /* Read FastBit offsets */ + if (FAIL == H5Dread(fastbit->offsets_id, fastbit->opaque_type_id, + offset_memspace_id, offsets_space_id, dxpl_id, fastbit->offsets)) + HGOTO_ERROR(H5E_INDEX, H5E_READERROR, FAIL, "can't read data"); + + /* Read FastBit bitmaps */ + if (FAIL == H5Dread(fastbit->bitmaps_id, fastbit->opaque_type_id, + bitmap_memspace_id, bitmaps_space_id, dxpl_id, fastbit->bitmaps)) + HGOTO_ERROR(H5E_INDEX, H5E_READERROR, FAIL, "can't read data"); + + + /* Reconstruct index */ + H5X_FASTBIT_LOG_DEBUG("Reconstructing index with nkeys=%lu, noffsets=%lu, " + "nbitmaps=%lu", fastbit->nkeys, fastbit->noffsets, fastbit->nbitmaps); + + /* Get space info */ + if (FAIL == (type_id = H5Dget_type(fastbit->dataset_id))) + HGOTO_ERROR(H5E_DATATYPE, H5E_CANTGET, FAIL, "can't get type from dataset"); + if (FAIL == (space_id = H5Dget_space(fastbit->dataset_id))) + HGOTO_ERROR(H5E_DATASPACE, H5E_CANTGET, FAIL, "can't get dataspace from dataset"); + if (0 == (nelmts = (size_t) H5Sget_select_npoints(space_id))) + HGOTO_ERROR(H5E_DATASPACE, H5E_BADVALUE, FAIL, "invalid number of elements"); + + /* Convert type to FastBit type */ + if (FastBitDataTypeUnknown == (fastbit_type = H5X__fastbit_convert_type(type_id))) + HGOTO_ERROR(H5E_INDEX, H5E_CANTCONVERT, FAIL, "can't convert type"); + + /* Register array */ + if (0 != fastbit_iapi_register_array_index_only(fastbit->column_name, + fastbit_type, &nelmts, 1, fastbit->keys, fastbit->nkeys, + fastbit->offsets, fastbit->noffsets, fastbit->bitmaps, + H5X__fastbit_read_bitmaps)) + HGOTO_ERROR(H5E_INDEX, H5E_CANTREGISTER, FAIL, "can't register array"); + + H5X_FASTBIT_LOG_DEBUG("Reconstructed index"); + fastbit->idx_reconstructed = TRUE; + +done: + + if (FAIL != type_id) + H5Tclose(type_id); + if (FAIL != space_id) + H5Sclose(space_id); + if (dxpl_id != H5P_DEFAULT) + H5Pclose(dxpl_id); + if (FAIL != keys_space_id) + H5Sclose(keys_space_id); + if (FAIL != offsets_space_id) + H5Sclose(offsets_space_id); + if (FAIL != bitmaps_space_id) + H5Sclose(bitmaps_space_id); + if ((key_memspace_id != H5S_ALL) && (key_memspace_id != FAIL)) + H5Sclose(key_memspace_id); + if ((offset_memspace_id != H5S_ALL) && (offset_memspace_id != FAIL)) + H5Sclose(offset_memspace_id); + if ((bitmap_memspace_id != H5S_ALL) && (bitmap_memspace_id != FAIL)) + H5Sclose(bitmap_memspace_id); + + FUNC_LEAVE_NOAPI(ret_value); +} + +static herr_t H5X__fastbit_reconstruct_index(H5X_fastbit_t *fastbit) { hid_t keys_space_id = FAIL, offsets_space_id = FAIL, bitmaps_space_id = FAIL; @@ -790,6 +1520,10 @@ H5X__fastbit_reconstruct_index(H5X_fastbit_t *fastbit) size_t nelmts; herr_t ret_value = SUCCEED; /* Return value */ +#ifdef H5_HAVE_PARALLEL + if (H5Xparallel_queries_enabled() > 0) + return H5X__fastbit_reconstruct_parallel_index(fastbit); +#endif FUNC_ENTER_NOAPI_NOINIT /* TODO don't read keys and offsets if already present */ @@ -1073,7 +1807,7 @@ H5X_fastbit_create(hid_t dataset_id, hid_t xcpl_id, hid_t H5_ATTR_UNUSED xapl_id { H5X_fastbit_t *fastbit = NULL; void *ret_value = NULL; /* Return value */ - size_t private_metadata_size; + size_t private_metadata_size = 0; void *buf = NULL; size_t buf_size; hbool_t read_on_create = TRUE; @@ -1243,7 +1977,7 @@ H5X_fastbit_close(void *idx_handle) FUNC_ENTER_NOAPI_NOINIT H5X_FASTBIT_LOG_DEBUG("Enter"); - + if (NULL == fastbit) HGOTO_ERROR(H5E_ARGS, H5E_BADVALUE, FAIL, "NULL index handle"); @@ -1265,17 +1999,20 @@ done: *------------------------------------------------------------------------- */ static herr_t -H5X_fastbit_pre_update(void *idx_handle, hid_t H5_ATTR_UNUSED dataspace_id, hid_t H5_ATTR_UNUSED xxpl_id) +H5X_fastbit_pre_update(void *idx_handle, hid_t filespace_id, hid_t H5_ATTR_UNUSED xxpl_id) { H5X_fastbit_t *fastbit = (H5X_fastbit_t *) idx_handle; herr_t ret_value = SUCCEED; /* Return value */ FUNC_ENTER_NOAPI_NOINIT H5X_FASTBIT_LOG_DEBUG("Enter"); - if (NULL == fastbit) HGOTO_ERROR(H5E_ARGS, H5E_BADVALUE, FAIL, "NULL index handle"); +#ifdef H5_HAVE_PARALLEL + fastbit->filespace_id = filespace_id; +#endif + done: H5X_FASTBIT_LOG_DEBUG("Leave"); FUNC_LEAVE_NOAPI(ret_value) diff --git a/src/H5Xmeta_dummy.c b/src/H5Xmeta_dummy.c index 9c6aafb..7f69183 100644 --- a/src/H5Xmeta_dummy.c +++ b/src/H5Xmeta_dummy.c @@ -275,6 +275,7 @@ const H5X_class_t H5X_META_DUMMY[1] = {{ H5X_PLUGIN_META_DUMMY, /* (Or whatever number is assigned) */ "dummy index plugin", /* Whatever name desired */ H5X_TYPE_METADATA, /* This plugin operates on metadata */ + H5X_SIMPLE_QUERY, &idx_class /* Index class */ }}; diff --git a/src/H5Xprivate.h b/src/H5Xprivate.h index 598ef6d..c9f41ca 100644 --- a/src/H5Xprivate.h +++ b/src/H5Xprivate.h @@ -18,6 +18,9 @@ #ifndef _H5Xprivate_H #define _H5Xprivate_H +/* Include configuration info */ +#include "H5pubconf.h" + /* Include package's public header */ #include "H5Xpublic.h" @@ -61,4 +64,6 @@ H5_DLL herr_t H5X_get_count(hid_t loc_id, hsize_t *idx_count); H5_DLL herr_t H5X_get_size(hid_t loc_id, hsize_t *idx_size); +H5_DLL herr_t H5Xgather_local_indices(void *buf, hid_t type_id, hid_t space_id, void *op, void *operator_data); + #endif /* _H5Xprivate_H */ diff --git a/src/H5Xpublic.h b/src/H5Xpublic.h index 5caa56e..88fa008 100644 --- a/src/H5Xpublic.h +++ b/src/H5Xpublic.h @@ -53,8 +53,8 @@ /* Index type */ typedef enum { - H5X_TYPE_DATA, /* Data index */ - H5X_TYPE_METADATA /* Metadata index */ + H5X_TYPE_DATA, /* Data index */ + H5X_TYPE_METADATA /* Metadata index */ } H5X_type_t; typedef struct { @@ -105,6 +105,10 @@ typedef union { H5X_metadata_class_t metadata_class; } H5X_idx_class_t; +#define H5X_SIMPLE_QUERY 1 +#define H5X_COMPOUND_QUERY 2 +#define H5X_DERIVED_VALUE_QUERY 4 + typedef struct { unsigned version; /* Version number of the index plugin class struct */ /* (Should always be set to H5X_CLASS_VERSION, which @@ -112,7 +116,8 @@ typedef struct { unsigned id; /* Index ID (assigned by The HDF Group, for now) */ const char *idx_name; /* Index name (for debugging only, currently) */ H5X_type_t type; /* Type of data indexed by this plugin */ - + int query_types; /* Plugin supports multiple (compound) query conditions */ + /* (see H5X_query_types - above) */ /* Callbacks */ H5X_idx_class_t *idx_class; /* Callback index class */ } H5X_class_t; @@ -141,6 +146,14 @@ H5_DLL hsize_t H5Xget_size(hid_t loc_id); H5_DLL herr_t H5Xget_type(hid_t object_id, hsize_t index_idx, unsigned *plugin_id); */ +#ifdef H5_HAVE_PARALLEL +H5_DLL herr_t H5Xinitialize_parallel_query(void); +H5_DLL int H5Xparallel_queries_enabled(void); +H5_DLL int H5Xslab_set(hid_t filespace_id, hsize_t **start, hsize_t **count, hsize_t **stride, hsize_t **block); +H5_DLL int H5Xparallel_rank(void); +H5_DLL int H5Xparallel_size(void); +H5_DLL herr_t H5Xallgather_by_size(void *xdata, int nelems, int typesize); +#endif #ifdef __cplusplus } diff --git a/src/Makefile.am b/src/Makefile.am index 09047fc..9eefe69 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -110,7 +110,7 @@ libhdf5_la_SOURCES= H5.c H5checksum.c H5dbg.c H5system.c H5timer.c H5trace.c \ H5Torder.c \ H5Tref.c \ H5Tpad.c H5Tprecis.c H5Tstrpad.c H5Tvisit.c H5Tvlen.c H5TS.c H5VM.c H5WB.c H5Z.c \ - H5X.c H5Xdummy.c H5Xalacrity.c H5Xfastbit.c \ + H5X.c H5Xdummy.c H5Xmeta_dummy.c H5Xalacrity.c H5Xfastbit.c \ H5Zdeflate.c H5Zfletcher32.c H5Znbit.c H5Zshuffle.c \ H5Zscaleoffset.c H5Zszip.c H5Ztrans.c |