From 3b06faba8434efa24cfed827318b151e0158be89 Mon Sep 17 00:00:00 2001 From: Mohamad Chaarawi Date: Mon, 5 May 2014 10:24:11 -0500 Subject: [svn-r25160] - fix issue with global variables for Mercury callback IDs - don't memcpy in coresident mode (in H5Dwrite - more comming soon). --- examples/h5ff_client_dset.c | 2 +- src/H5VLiod.c | 72 +++++++++++++++ src/H5VLiod_attr.c | 4 + src/H5VLiod_common.h | 136 ++++++++++++++-------------- src/H5VLiod_dset.c | 212 ++++++++++++++++++++++++++------------------ src/H5VLiod_map.c | 16 ++-- src/H5VLiod_server.c | 2 +- src/H5VLiod_server.h | 3 +- src/H5VLiod_util.c | 19 +++- 9 files changed, 301 insertions(+), 165 deletions(-) diff --git a/examples/h5ff_client_dset.c b/examples/h5ff_client_dset.c index 3b1e427..b5dfb89 100644 --- a/examples/h5ff_client_dset.c +++ b/examples/h5ff_client_dset.c @@ -541,7 +541,7 @@ int main(int argc, char **argv) { ret = H5Gclose_ff(gid1, e_stack); assert(ret == 0); - H5Fclose_ff(file_id, 1, H5_EVENT_STACK_NULL); + H5Fclose_ff(file_id, 0, H5_EVENT_STACK_NULL); H5ESget_count(e_stack, &num_events); diff --git a/src/H5VLiod.c b/src/H5VLiod.c index 1db9dff..df44932 100644 --- a/src/H5VLiod.c +++ b/src/H5VLiod.c @@ -56,6 +56,78 @@ static int coresident = 0; static AXE_task_t g_axe_id; static H5VL_iod_axe_list_t axe_list; +/* function shipper IDs for different routines */ +hg_id_t H5VL_EFF_INIT_ID; +hg_id_t H5VL_EFF_FINALIZE_ID; +hg_id_t H5VL_ANALYSIS_EXECUTE_ID; +hg_id_t H5VL_FILE_CREATE_ID; +hg_id_t H5VL_FILE_OPEN_ID; +hg_id_t H5VL_FILE_CLOSE_ID; +hg_id_t H5VL_ATTR_CREATE_ID; +hg_id_t H5VL_ATTR_OPEN_ID; +hg_id_t H5VL_ATTR_READ_ID; +hg_id_t H5VL_ATTR_WRITE_ID; +hg_id_t H5VL_ATTR_EXISTS_ID; +//hg_id_t H5VL_ATTR_ITERATE_ID; +hg_id_t H5VL_ATTR_RENAME_ID; +hg_id_t H5VL_ATTR_REMOVE_ID; +hg_id_t H5VL_ATTR_CLOSE_ID; +hg_id_t H5VL_GROUP_CREATE_ID; +hg_id_t H5VL_GROUP_OPEN_ID; +hg_id_t H5VL_GROUP_CLOSE_ID; +hg_id_t H5VL_MAP_CREATE_ID; +hg_id_t H5VL_MAP_OPEN_ID; +hg_id_t H5VL_MAP_SET_ID; +hg_id_t H5VL_MAP_GET_ID; +hg_id_t H5VL_MAP_GET_COUNT_ID; +hg_id_t H5VL_MAP_EXISTS_ID; +//hg_id_t H5VL_MAP_ITERATE_ID; +hg_id_t H5VL_MAP_DELETE_ID; +hg_id_t H5VL_MAP_CLOSE_ID; +hg_id_t H5VL_DSET_CREATE_ID; +hg_id_t H5VL_DSET_OPEN_ID; +hg_id_t H5VL_DSET_READ_ID; +hg_id_t H5VL_DSET_GET_VL_SIZE_ID; +hg_id_t H5VL_DSET_WRITE_ID; +hg_id_t H5VL_DSET_SET_EXTENT_ID; +hg_id_t H5VL_DSET_CLOSE_ID; +hg_id_t H5VL_DTYPE_COMMIT_ID; +hg_id_t H5VL_DTYPE_OPEN_ID; +hg_id_t H5VL_DTYPE_CLOSE_ID; +hg_id_t H5VL_LINK_CREATE_ID; +hg_id_t H5VL_LINK_MOVE_ID; +//hg_id_t H5VL_LINK_ITERATE_ID; +hg_id_t H5VL_LINK_EXISTS_ID; +hg_id_t H5VL_LINK_GET_INFO_ID; +hg_id_t H5VL_LINK_GET_VAL_ID; +hg_id_t H5VL_LINK_REMOVE_ID; +hg_id_t H5VL_OBJECT_OPEN_BY_TOKEN_ID; +hg_id_t H5VL_OBJECT_OPEN_ID; +//hg_id_t H5VL_OBJECT_COPY_ID; +//hg_id_t H5VL_OBJECT_VISIT_ID; +hg_id_t H5VL_OBJECT_EXISTS_ID; +hg_id_t H5VL_OBJECT_SET_COMMENT_ID; +hg_id_t H5VL_OBJECT_GET_COMMENT_ID; +hg_id_t H5VL_OBJECT_GET_INFO_ID; +hg_id_t H5VL_RC_ACQUIRE_ID; +hg_id_t H5VL_RC_RELEASE_ID; +hg_id_t H5VL_RC_PERSIST_ID; +hg_id_t H5VL_RC_SNAPSHOT_ID; +hg_id_t H5VL_TR_START_ID; +hg_id_t H5VL_TR_FINISH_ID; +hg_id_t H5VL_TR_SET_DEPEND_ID; +hg_id_t H5VL_TR_SKIP_ID; +hg_id_t H5VL_TR_ABORT_ID; +hg_id_t H5VL_PREFETCH_ID; +hg_id_t H5VL_EVICT_ID; +hg_id_t H5VL_CANCEL_OP_ID; +hg_id_t H5VL_VIEW_CREATE_ID; +#ifdef H5_HAVE_INDEXING +hg_id_t H5VL_DSET_SET_INDEX_INFO_ID; +hg_id_t H5VL_DSET_GET_INDEX_INFO_ID; +hg_id_t H5VL_DSET_RM_INDEX_INFO_ID; +#endif + /* Prototypes */ static void *H5VL_iod_fapl_copy(const void *_old_fa); static herr_t H5VL_iod_fapl_free(void *_fa); diff --git a/src/H5VLiod_attr.c b/src/H5VLiod_attr.c index 58a4117..bfe35ac 100644 --- a/src/H5VLiod_attr.c +++ b/src/H5VLiod_attr.c @@ -493,6 +493,8 @@ H5VL_iod_server_attr_read_cb(AXE_engine_t UNUSED axe_engine, int ndims; /* dataset's rank/number of dimensions */ hssize_t num_descriptors = 0; /* number of IOD file descriptors needed to describe filespace selection */ na_addr_t dest = HG_Handler_get_addr(op_data->hg_handle); /* destination address to push data to */ + na_class_t *na_class = HG_Handler_get_na_class(op_data->hg_handle); /* NA transfer class */ + na_bool_t is_coresident = NA_Addr_is_self(na_class, dest); hbool_t opened_locally = FALSE; /* flag to indicate whether we opened the attribute here or if it was already open */ iod_ret_t ret; herr_t ret_value = SUCCEED; @@ -693,6 +695,8 @@ H5VL_iod_server_attr_write_cb(AXE_engine_t UNUSED axe_engine, iod_checksum_t attr_cs = 0; hssize_t num_descriptors = 0; /* number of IOD file descriptors needed to describe filespace selection*/ na_addr_t source = HG_Handler_get_addr(op_data->hg_handle); /* source address to pull data from */ + na_class_t *na_class = HG_Handler_get_na_class(op_data->hg_handle); /* NA transfer class */ + na_bool_t is_coresident = NA_Addr_is_self(na_class, source); hbool_t opened_locally = FALSE; /* flag to indicate whether we opened the attribute here or if it was already opened */ iod_ret_t ret; herr_t ret_value = SUCCEED; diff --git a/src/H5VLiod_common.h b/src/H5VLiod_common.h index 5a421ac..91b0f8d 100644 --- a/src/H5VLiod_common.h +++ b/src/H5VLiod_common.h @@ -33,75 +33,75 @@ #define H5_EFF_DEBUG 1 /* function shipper IDs for different routines */ -hg_id_t H5VL_EFF_INIT_ID; -hg_id_t H5VL_EFF_FINALIZE_ID; -hg_id_t H5VL_ANALYSIS_EXECUTE_ID; -hg_id_t H5VL_FILE_CREATE_ID; -hg_id_t H5VL_FILE_OPEN_ID; -hg_id_t H5VL_FILE_CLOSE_ID; -hg_id_t H5VL_ATTR_CREATE_ID; -hg_id_t H5VL_ATTR_OPEN_ID; -hg_id_t H5VL_ATTR_READ_ID; -hg_id_t H5VL_ATTR_WRITE_ID; -hg_id_t H5VL_ATTR_EXISTS_ID; -//hg_id_t H5VL_ATTR_ITERATE_ID; -hg_id_t H5VL_ATTR_RENAME_ID; -hg_id_t H5VL_ATTR_REMOVE_ID; -hg_id_t H5VL_ATTR_CLOSE_ID; -hg_id_t H5VL_GROUP_CREATE_ID; -hg_id_t H5VL_GROUP_OPEN_ID; -hg_id_t H5VL_GROUP_CLOSE_ID; -hg_id_t H5VL_MAP_CREATE_ID; -hg_id_t H5VL_MAP_OPEN_ID; -hg_id_t H5VL_MAP_SET_ID; -hg_id_t H5VL_MAP_GET_ID; -hg_id_t H5VL_MAP_GET_COUNT_ID; -hg_id_t H5VL_MAP_EXISTS_ID; -//hg_id_t H5VL_MAP_ITERATE_ID; -hg_id_t H5VL_MAP_DELETE_ID; -hg_id_t H5VL_MAP_CLOSE_ID; -hg_id_t H5VL_DSET_CREATE_ID; -hg_id_t H5VL_DSET_OPEN_ID; -hg_id_t H5VL_DSET_READ_ID; -hg_id_t H5VL_DSET_GET_VL_SIZE_ID; -hg_id_t H5VL_DSET_WRITE_ID; -hg_id_t H5VL_DSET_SET_EXTENT_ID; -hg_id_t H5VL_DSET_CLOSE_ID; -hg_id_t H5VL_DTYPE_COMMIT_ID; -hg_id_t H5VL_DTYPE_OPEN_ID; -hg_id_t H5VL_DTYPE_CLOSE_ID; -hg_id_t H5VL_LINK_CREATE_ID; -hg_id_t H5VL_LINK_MOVE_ID; -//hg_id_t H5VL_LINK_ITERATE_ID; -hg_id_t H5VL_LINK_EXISTS_ID; -hg_id_t H5VL_LINK_GET_INFO_ID; -hg_id_t H5VL_LINK_GET_VAL_ID; -hg_id_t H5VL_LINK_REMOVE_ID; -hg_id_t H5VL_OBJECT_OPEN_BY_TOKEN_ID; -hg_id_t H5VL_OBJECT_OPEN_ID; -//hg_id_t H5VL_OBJECT_COPY_ID; -//hg_id_t H5VL_OBJECT_VISIT_ID; -hg_id_t H5VL_OBJECT_EXISTS_ID; -hg_id_t H5VL_OBJECT_SET_COMMENT_ID; -hg_id_t H5VL_OBJECT_GET_COMMENT_ID; -hg_id_t H5VL_OBJECT_GET_INFO_ID; -hg_id_t H5VL_RC_ACQUIRE_ID; -hg_id_t H5VL_RC_RELEASE_ID; -hg_id_t H5VL_RC_PERSIST_ID; -hg_id_t H5VL_RC_SNAPSHOT_ID; -hg_id_t H5VL_TR_START_ID; -hg_id_t H5VL_TR_FINISH_ID; -hg_id_t H5VL_TR_SET_DEPEND_ID; -hg_id_t H5VL_TR_SKIP_ID; -hg_id_t H5VL_TR_ABORT_ID; -hg_id_t H5VL_PREFETCH_ID; -hg_id_t H5VL_EVICT_ID; -hg_id_t H5VL_CANCEL_OP_ID; -hg_id_t H5VL_VIEW_CREATE_ID; +extern hg_id_t H5VL_EFF_INIT_ID; +extern hg_id_t H5VL_EFF_FINALIZE_ID; +extern hg_id_t H5VL_ANALYSIS_EXECUTE_ID; +extern hg_id_t H5VL_FILE_CREATE_ID; +extern hg_id_t H5VL_FILE_OPEN_ID; +extern hg_id_t H5VL_FILE_CLOSE_ID; +extern hg_id_t H5VL_ATTR_CREATE_ID; +extern hg_id_t H5VL_ATTR_OPEN_ID; +extern hg_id_t H5VL_ATTR_READ_ID; +extern hg_id_t H5VL_ATTR_WRITE_ID; +extern hg_id_t H5VL_ATTR_EXISTS_ID; +//extern hg_id_t H5VL_ATTR_ITERATE_ID; +extern hg_id_t H5VL_ATTR_RENAME_ID; +extern hg_id_t H5VL_ATTR_REMOVE_ID; +extern hg_id_t H5VL_ATTR_CLOSE_ID; +extern hg_id_t H5VL_GROUP_CREATE_ID; +extern hg_id_t H5VL_GROUP_OPEN_ID; +extern hg_id_t H5VL_GROUP_CLOSE_ID; +extern hg_id_t H5VL_MAP_CREATE_ID; +extern hg_id_t H5VL_MAP_OPEN_ID; +extern hg_id_t H5VL_MAP_SET_ID; +extern hg_id_t H5VL_MAP_GET_ID; +extern hg_id_t H5VL_MAP_GET_COUNT_ID; +extern hg_id_t H5VL_MAP_EXISTS_ID; +//extern hg_id_t H5VL_MAP_ITERATE_ID; +extern hg_id_t H5VL_MAP_DELETE_ID; +extern hg_id_t H5VL_MAP_CLOSE_ID; +extern hg_id_t H5VL_DSET_CREATE_ID; +extern hg_id_t H5VL_DSET_OPEN_ID; +extern hg_id_t H5VL_DSET_READ_ID; +extern hg_id_t H5VL_DSET_GET_VL_SIZE_ID; +extern hg_id_t H5VL_DSET_WRITE_ID; +extern hg_id_t H5VL_DSET_SET_EXTENT_ID; +extern hg_id_t H5VL_DSET_CLOSE_ID; +extern hg_id_t H5VL_DTYPE_COMMIT_ID; +extern hg_id_t H5VL_DTYPE_OPEN_ID; +extern hg_id_t H5VL_DTYPE_CLOSE_ID; +extern hg_id_t H5VL_LINK_CREATE_ID; +extern hg_id_t H5VL_LINK_MOVE_ID; +//extern hg_id_t H5VL_LINK_ITERATE_ID; +extern hg_id_t H5VL_LINK_EXISTS_ID; +extern hg_id_t H5VL_LINK_GET_INFO_ID; +extern hg_id_t H5VL_LINK_GET_VAL_ID; +extern hg_id_t H5VL_LINK_REMOVE_ID; +extern hg_id_t H5VL_OBJECT_OPEN_BY_TOKEN_ID; +extern hg_id_t H5VL_OBJECT_OPEN_ID; +//extern hg_id_t H5VL_OBJECT_COPY_ID; +//extern hg_id_t H5VL_OBJECT_VISIT_ID; +extern hg_id_t H5VL_OBJECT_EXISTS_ID; +extern hg_id_t H5VL_OBJECT_SET_COMMENT_ID; +extern hg_id_t H5VL_OBJECT_GET_COMMENT_ID; +extern hg_id_t H5VL_OBJECT_GET_INFO_ID; +extern hg_id_t H5VL_RC_ACQUIRE_ID; +extern hg_id_t H5VL_RC_RELEASE_ID; +extern hg_id_t H5VL_RC_PERSIST_ID; +extern hg_id_t H5VL_RC_SNAPSHOT_ID; +extern hg_id_t H5VL_TR_START_ID; +extern hg_id_t H5VL_TR_FINISH_ID; +extern hg_id_t H5VL_TR_SET_DEPEND_ID; +extern hg_id_t H5VL_TR_SKIP_ID; +extern hg_id_t H5VL_TR_ABORT_ID; +extern hg_id_t H5VL_PREFETCH_ID; +extern hg_id_t H5VL_EVICT_ID; +extern hg_id_t H5VL_CANCEL_OP_ID; +extern hg_id_t H5VL_VIEW_CREATE_ID; #ifdef H5_HAVE_INDEXING -hg_id_t H5VL_DSET_SET_INDEX_INFO_ID; -hg_id_t H5VL_DSET_GET_INDEX_INFO_ID; -hg_id_t H5VL_DSET_RM_INDEX_INFO_ID; +extern hg_id_t H5VL_DSET_SET_INDEX_INFO_ID; +extern hg_id_t H5VL_DSET_GET_INDEX_INFO_ID; +extern hg_id_t H5VL_DSET_RM_INDEX_INFO_ID; #endif /* Structure for a span of fixed-length data in a type */ diff --git a/src/H5VLiod_dset.c b/src/H5VLiod_dset.c index 2df1fb7..8c00039 100644 --- a/src/H5VLiod_dset.c +++ b/src/H5VLiod_dset.c @@ -51,7 +51,8 @@ H5VL__iod_server_vl_data_write(iod_handle_t coh, iod_obj_id_t iod_id, iod_handle H5VL_iod_type_info_t type_info, size_t nelmts, size_t num_segments, void **addrs, size_t *sizes, hid_t dxpl_id, iod_trans_id_t wtid, iod_trans_id_t rtid, - na_addr_t source, hg_bulk_t bulk_handle, uint32_t cs_scope); + na_addr_t source, hg_bulk_t bulk_handle, uint32_t cs_scope, + na_bool_t is_coresident); static herr_t H5VL__iod_server_vl_data_read(iod_handle_t coh, AXE_engine_t axe_engine, AXE_task_t axe_id, @@ -580,6 +581,8 @@ H5VL_iod_server_dset_read_cb(AXE_engine_t axe_engine, hbool_t is_vl_data; size_t nelmts; /* number of elements selected to read */ na_addr_t dest = HG_Handler_get_addr(op_data->hg_handle); /* destination address to push data to */ + na_class_t *na_class = HG_Handler_get_na_class(op_data->hg_handle); /* NA transfer class */ + na_bool_t is_coresident = NA_Addr_is_self(na_class, dest); iod_ret_t ret; hbool_t opened_locally = FALSE; /* flag to indicate whether we opened the dset here or if it was already open */ herr_t ret_value = SUCCEED; @@ -622,7 +625,7 @@ H5VL_iod_server_dset_read_cb(AXE_engine_t axe_engine, /* Adjust buffer is type conversion is needed. If the data elements are of variable length, just return that they are in is_vl_data for special processing */ - if(H5VL__iod_server_adjust_buffer(dst_id, src_id, nelmts, dxpl_id, + if(H5VL__iod_server_adjust_buffer(dst_id, src_id, nelmts, dxpl_id, is_coresident, size, &buf, &is_vl_data, &buf_size) < 0) HGOTO_ERROR_FF(FAIL, "failed to setup write operation"); @@ -765,6 +768,8 @@ H5VL_iod_server_dset_get_vl_size_cb(AXE_engine_t UNUSED axe_engine, size_t nelmts; /* number of elements selected to read */ uint8_t *buf_ptr = NULL; na_addr_t dest = HG_Handler_get_addr(op_data->hg_handle); /* destination address to push data to */ + na_class_t *na_class = HG_Handler_get_na_class(op_data->hg_handle); /* NA transfer class */ + na_bool_t is_coresident = NA_Addr_is_self(na_class, dest); hbool_t opened_locally = FALSE; /* flag to indicate whether we opened the dset here or if it was already open */ iod_ret_t ret; herr_t ret_value = SUCCEED; @@ -936,10 +941,16 @@ H5VL_iod_server_dset_write_cb(AXE_engine_t UNUSED axe_engine, size_t nelmts; /* number of elements selected to read */ hbool_t flag = FALSE; /* temp flag to indicate whether corruption will be inserted */ na_addr_t source = HG_Handler_get_addr(op_data->hg_handle); /* source address to pull data from */ + na_class_t *na_class = HG_Handler_get_na_class(op_data->hg_handle); /* NA transfer class */ + na_bool_t is_coresident = NA_Addr_is_self(na_class, source); hbool_t opened_locally = FALSE; /* flag to indicate whether we opened the dset here or if it was already open */ iod_ret_t ret; herr_t ret_value = SUCCEED; + /* MSC - for now do memcpy if segment count > 1 */ + if(is_coresident && 1 != HG_Bulk_handle_get_segment_count(bulk_handle)) + is_coresident = NA_FALSE; + /* open the dataset if we don't have the handle yet */ if(iod_oh.wr_oh.cookie == IOD_OH_UNDEFINED) { ret = iod_obj_open_write(coh, iod_id, wtid, NULL, &iod_oh.wr_oh, NULL); @@ -981,50 +992,68 @@ H5VL_iod_server_dset_write_cb(AXE_engine_t UNUSED axe_engine, if(vl_lengths_size == 0) HGOTO_ERROR_FF(FAIL, "no vlen lengths sent"); - if(NULL == (vl_lengths = (char *)malloc(vl_lengths_size))) - HGOTO_ERROR_FF(FAIL, "can't allocate vlen lengths buffer"); - - /* Create bulk handle */ - if(HG_SUCCESS != HG_Bulk_handle_create(1, &vl_lengths, &vl_lengths_size, - HG_BULK_READWRITE, &vl_len_handle)) - HGOTO_ERROR_FF(FAIL, "create vlen bulk handle"); - - /* Pull data from the client */ - if(HG_SUCCESS != HG_Bulk_transfer(HG_BULK_PULL, source, vl_len_bulk_handle, 0, - vl_len_handle, 0, vl_lengths_size, &bulk_request)) - HGOTO_ERROR_FF(FAIL, "Transfer data failed"); + if(is_coresident) { + size_t bulk_size = 0; - /* Wait for bulk data read to complete */ - if(HG_SUCCESS != HG_Bulk_wait(bulk_request, HG_MAX_IDLE_TIME, HG_STATUS_IGNORE)) - HGOTO_ERROR_FF(FAIL, "can't wait for vlen lengths bulk data operation"); + vl_len_handle = vl_len_bulk_handle; + /* get mercury buffer where data is */ + if(HG_SUCCESS != HG_Bulk_handle_access(vl_len_handle, 0, vl_lengths_size, + HG_BULK_READWRITE, 1, + &vl_lengths, &bulk_size, NULL)) + HGOTO_ERROR_FF(FAIL, "Could not access handle"); -#if 0 - /* get mercury buffer where data is */ - if(HG_SUCCESS != HG_Bulk_handle_access(vl_len_handle, 0, vl_lengths_size, - HG_BULK_READWRITE, 1, - &vl_lengths, &vl_lengths_size, NULL)) - HGOTO_ERROR_FF(FAIL, "Could not access handle"); -#endif + assert(vl_lengths_size == bulk_size); + } + else { + if(NULL == (vl_lengths = (char *)malloc(vl_lengths_size))) + HGOTO_ERROR_FF(FAIL, "can't allocate vlen lengths buffer"); + + /* Create bulk handle */ + if(HG_SUCCESS != HG_Bulk_handle_create(1, &vl_lengths, &vl_lengths_size, + HG_BULK_READWRITE, &vl_len_handle)) + HGOTO_ERROR_FF(FAIL, "create vlen bulk handle"); + + /* Pull data from the client */ + if(HG_SUCCESS != HG_Bulk_transfer(HG_BULK_PULL, source, vl_len_bulk_handle, 0, + vl_len_handle, 0, vl_lengths_size, &bulk_request)) + HGOTO_ERROR_FF(FAIL, "Transfer data failed"); + + /* Wait for bulk data read to complete */ + if(HG_SUCCESS != HG_Bulk_wait(bulk_request, HG_MAX_IDLE_TIME, HG_STATUS_IGNORE)) + HGOTO_ERROR_FF(FAIL, "can't wait for vlen lengths bulk data operation"); + } if(NULL == (buf = malloc(nelmts * type_info.size))) HGOTO_ERROR_FF(FAIL, "can't allocate data buffer"); /* Create segments from vl lengths */ - if(H5VL_iod_create_segments_recv((char *)buf, &type_info, nelmts, &addrs, &sizes, &num_segments, + if(H5VL_iod_create_segments_recv((char *)buf, &type_info, nelmts, &addrs, &sizes, &num_segments, vl_lengths, vl_lengths_size, &free_list, &free_list_len) < 0) HGOTO_ERROR_FF(FAIL, "can't create segments for bulk data transfer"); assert(addrs); assert(sizes); + free(buf); + buf = NULL; + ret = H5VL__iod_server_vl_data_write(coh, iod_id, iod_oh, space_id, src_id, dst_id, type_info, nelmts, num_segments, addrs, sizes, dxpl_id, wtid, rtid, - source, bulk_handle, raw_cs_scope); + source, bulk_handle, raw_cs_scope, is_coresident); if(ret != SUCCEED) HGOTO_ERROR_FF(ret, "can't write VL data to array object"); /* Free the bulk handle */ - if(HG_SUCCESS != HG_Bulk_handle_free(vl_len_handle)) - HGOTO_ERROR_FF(FAIL, "can't free vlen bulk handle"); + if(!is_coresident) { + if(HG_SUCCESS != HG_Bulk_handle_free(vl_len_handle)) + HGOTO_ERROR_FF(FAIL, "can't free vlen bulk handle"); + + /* Free vl_lengths */ + if(vl_lengths) { + free(vl_lengths); + vl_lengths = NULL; + vl_lengths_size = 0; + } + } /* Free segments */ if(addrs) { @@ -1042,18 +1071,6 @@ H5VL_iod_server_dset_write_cb(AXE_engine_t UNUSED axe_engine, free_list = NULL; free_list_len = 0; } /* end if */ - - /* Free vl_lengths */ - if(vl_lengths) { - free(vl_lengths); - vl_lengths = NULL; - vl_lengths_size = 0; - } - - if(buf) { - free(buf); - buf = NULL; - } } else { size_t elmt_size; @@ -1061,30 +1078,36 @@ H5VL_iod_server_dset_write_cb(AXE_engine_t UNUSED axe_engine, /* retrieve size of incoming bulk data */ size = HG_Bulk_handle_get_size(bulk_handle); - /* allocate buffer to hold data */ - if(NULL == (buf = malloc(size))) - HGOTO_ERROR_FF(FAIL, "can't allocate read buffer"); + if(is_coresident) { + size_t bulk_size = 0; - /* Create bulk handle */ - if(HG_SUCCESS != HG_Bulk_handle_create(1, &buf, &size, - HG_BULK_READWRITE, &bulk_block_handle)) - HGOTO_ERROR_FF(FAIL, "can't create bulk handle"); + bulk_block_handle = bulk_handle; + /* get mercury buffer where data is */ + if(HG_SUCCESS != HG_Bulk_handle_access(bulk_block_handle, 0, size, + HG_BULK_READWRITE, 1, &buf, &bulk_size, NULL)) + HGOTO_ERROR_FF(FAIL, "Could not access handle"); - /* Pull data from the client */ - if(HG_SUCCESS != HG_Bulk_transfer(HG_BULK_PULL, source, bulk_handle, 0, - bulk_block_handle, 0, size, &bulk_request)) - HGOTO_ERROR_FF(FAIL, "Transfer data failed"); - - /* Wait for bulk data read to complete */ - if(HG_SUCCESS != HG_Bulk_wait(bulk_request, HG_MAX_IDLE_TIME, HG_STATUS_IGNORE)) - HGOTO_ERROR_FF(FAIL, "can't wait for bulk data operation"); - -#if 0 - /* get mercury buffer where data is */ - if(HG_SUCCESS != HG_Bulk_handle_access(bulk_block_handle, 0, size, - HG_BULK_READWRITE, 1, &buf, &size, NULL)) - HGOTO_ERROR_FF(FAIL, "Could not access handle"); -#endif + assert(size == bulk_size); + } + else { + /* allocate buffer to hold data */ + if(NULL == (buf = malloc(size))) + HGOTO_ERROR_FF(FAIL, "can't allocate read buffer"); + + /* Create bulk handle */ + if(HG_SUCCESS != HG_Bulk_handle_create(1, &buf, &size, + HG_BULK_READWRITE, &bulk_block_handle)) + HGOTO_ERROR_FF(FAIL, "can't create bulk handle"); + + /* Pull data from the client */ + if(HG_SUCCESS != HG_Bulk_transfer(HG_BULK_PULL, source, bulk_handle, 0, + bulk_block_handle, 0, size, &bulk_request)) + HGOTO_ERROR_FF(FAIL, "Transfer data failed"); + + /* Wait for bulk data read to complete */ + if(HG_SUCCESS != HG_Bulk_wait(bulk_request, HG_MAX_IDLE_TIME, HG_STATUS_IGNORE)) + HGOTO_ERROR_FF(FAIL, "can't wait for bulk data operation"); + } /* MSC - check if client requested to corrupt data */ if(H5Pget_dxpl_inject_corruption(dxpl_id, &flag) < 0) @@ -1115,7 +1138,7 @@ H5VL_iod_server_dset_write_cb(AXE_engine_t UNUSED axe_engine, /* Adjust buffer is type conversion is needed. If the data elements are of variable length, just return that they are in is_vl_data for special processing */ - if(H5VL__iod_server_adjust_buffer(src_id, dst_id, nelmts, dxpl_id, + if(H5VL__iod_server_adjust_buffer(src_id, dst_id, nelmts, dxpl_id, is_coresident, size, &buf, &is_vl_data, &buf_size) < 0) HGOTO_ERROR_FF(FAIL, "failed to setup write operation"); @@ -1128,9 +1151,15 @@ H5VL_iod_server_dset_write_cb(AXE_engine_t UNUSED axe_engine, buf, buf_size, cs, raw_cs_scope, wtid); /* free the block handle */ - if(HG_SUCCESS != HG_Bulk_handle_free(bulk_block_handle)) - HGOTO_ERROR_FF(FAIL, "can't free bds block handle"); + if(!is_coresident) { + if(HG_SUCCESS != HG_Bulk_handle_free(bulk_block_handle)) + HGOTO_ERROR_FF(FAIL, "can't free bds block handle"); + if(buf) { + free(buf); + buf = NULL; + } + } if(ret != SUCCEED) HGOTO_ERROR_FF(ret, "can't write to array object"); } @@ -1685,7 +1714,8 @@ H5VL__iod_server_vl_data_write(iod_handle_t coh, iod_obj_id_t iod_id, iod_handle H5VL_iod_type_info_t type_info, size_t nelmts, size_t num_segments, void **addrs, size_t *sizes, hid_t UNUSED dxpl_id, iod_trans_id_t wtid, iod_trans_id_t rtid, - na_addr_t source, hg_bulk_t bulk_handle, uint32_t cs_scope) + na_addr_t source, hg_bulk_t bulk_handle, uint32_t cs_scope, + na_bool_t is_coresident) { char bogus; /* bogus value to pass to H5Diterate() */ H5VL_iod_server_vl_write_t udata; @@ -1704,28 +1734,36 @@ H5VL__iod_server_vl_data_write(iod_handle_t coh, iod_obj_id_t iod_id, iod_handle buf_size += sizes[u]; } /* end for */ - if(NULL == (buf = malloc(buf_size))) - HGOTO_ERROR_FF(FAIL, "can't allocate data buffer"); + if(is_coresident) { + size_t bulk_size = 0; - /* Create bulk handle */ - if(HG_SUCCESS != HG_Bulk_handle_create(1, &buf, &buf_size, - HG_BULK_READWRITE, &vl_data_handle)) - HGOTO_ERROR_FF(FAIL, "can't create bulk handle"); + vl_data_handle = bulk_handle; - /* Pull data from the client */ - if(HG_SUCCESS != HG_Bulk_transfer(HG_BULK_PULL, source, bulk_handle, 0, - vl_data_handle, 0, buf_size, &bulk_request)) - HGOTO_ERROR_FF(FAIL, "Transfer data failed"); + /* get mercury buffer where data is */ + if(HG_SUCCESS != HG_Bulk_handle_access(vl_data_handle, 0, buf_size, + HG_BULK_READWRITE, 1, &buf, &bulk_size, NULL)) + HGOTO_ERROR_FF(FAIL, "Could not access handle"); - /* Wait for bulk data read to complete */ - if(HG_SUCCESS != HG_Bulk_wait(bulk_request, HG_MAX_IDLE_TIME, HG_STATUS_IGNORE)) - HGOTO_ERROR_FF(FAIL, "can't wait for bulk data operation"); -#if 0 - /* get mercury buffer where data is */ - if(HG_SUCCESS != HG_Bulk_handle_access(vl_data_handle, 0, buf_size, - HG_BULK_READWRITE, 1, &buf, &buf_size, NULL)) - HGOTO_ERROR_FF(FAIL, "Could not access handle"); -#endif + assert(buf_size == bulk_size); + } + else { + if(NULL == (buf = malloc(buf_size))) + HGOTO_ERROR_FF(FAIL, "can't allocate data buffer"); + + /* Create bulk handle */ + if(HG_SUCCESS != HG_Bulk_handle_create(1, &buf, &buf_size, + HG_BULK_READWRITE, &vl_data_handle)) + HGOTO_ERROR_FF(FAIL, "can't create bulk handle"); + + /* Pull data from the client */ + if(HG_SUCCESS != HG_Bulk_transfer(HG_BULK_PULL, source, bulk_handle, 0, + vl_data_handle, 0, buf_size, &bulk_request)) + HGOTO_ERROR_FF(FAIL, "Transfer data failed"); + + /* Wait for bulk data read to complete */ + if(HG_SUCCESS != HG_Bulk_wait(bulk_request, HG_MAX_IDLE_TIME, HG_STATUS_IGNORE)) + HGOTO_ERROR_FF(FAIL, "can't wait for bulk data operation"); + } /* set other parameters needed to do IO */ udata.coh = coh; @@ -1749,6 +1787,10 @@ H5VL__iod_server_vl_data_write(iod_handle_t coh, iod_obj_id_t iod_id, iod_handle if(HG_SUCCESS != HG_Bulk_handle_free(vl_data_handle)) HGOTO_ERROR_FF(FAIL, "can't free vlen bulk handle"); done: + if(buf) { + free(buf); + buf = NULL; + } return ret_value; }/* end H5VL__iod_server_vl_data_write */ diff --git a/src/H5VLiod_map.c b/src/H5VLiod_map.c index 61f004b..5f6256a 100644 --- a/src/H5VLiod_map.c +++ b/src/H5VLiod_map.c @@ -450,6 +450,8 @@ H5VL_iod_server_map_set_cb(AXE_engine_t UNUSED axe_engine, iod_trans_id_t rtid = input->rcxt_num; uint32_t cs_scope = input->cs_scope; na_addr_t source = HG_Handler_get_addr(op_data->hg_handle); /* source address to pull data from */ + na_class_t *na_class = HG_Handler_get_na_class(op_data->hg_handle); /* NA transfer class */ + na_bool_t is_coresident = NA_Addr_is_self(na_class, source); hg_bulk_t bulk_block_handle; /* HG block handle */ hg_bulk_request_t bulk_request; /* HG request */ size_t key_size, val_size, new_val_size; @@ -529,11 +531,11 @@ H5VL_iod_server_map_set_cb(AXE_engine_t UNUSED axe_engine, #endif /* adjust buffers for datatype conversion */ - if(H5VL__iod_server_adjust_buffer(key_memtype_id, key_maptype_id, 1, dxpl_id, + if(H5VL__iod_server_adjust_buffer(key_memtype_id, key_maptype_id, 1, dxpl_id, is_coresident, key.buf_size, &key.buf, &key_is_vl_data, &key_size) < 0) HGOTO_ERROR_FF(FAIL, "data type conversion failed"); - if(H5VL__iod_server_adjust_buffer(val_memtype_id, val_maptype_id, 1, dxpl_id, + if(H5VL__iod_server_adjust_buffer(val_memtype_id, val_maptype_id, 1, dxpl_id, is_coresident, val_size, &val_buf, &val_is_vl_data, &new_val_size) < 0) HGOTO_ERROR_FF(FAIL, "data type conversion failed"); @@ -668,6 +670,8 @@ H5VL_iod_server_map_get_cb(AXE_engine_t UNUSED axe_engine, hg_bulk_t bulk_block_handle; /* HG block handle */ hg_bulk_request_t bulk_request; /* HG request */ na_addr_t dest = HG_Handler_get_addr(op_data->hg_handle); /* destination address to push data to */ + na_class_t *na_class = HG_Handler_get_na_class(op_data->hg_handle); /* NA transfer class */ + na_bool_t is_coresident = NA_Addr_is_self(na_class, dest); hbool_t opened_locally = FALSE; iod_checksum_t kv_cs[2]; iod_ret_t ret; @@ -708,7 +712,7 @@ H5VL_iod_server_map_get_cb(AXE_engine_t UNUSED axe_engine, HGOTO_ERROR_FF(FAIL, "can't get scope for data integrity checks"); /* adjust buffers for datatype conversion */ - if(H5VL__iod_server_adjust_buffer(key_memtype_id, key_maptype_id, 1, dxpl_id, + if(H5VL__iod_server_adjust_buffer(key_memtype_id, key_maptype_id, 1, dxpl_id, 0, key.buf_size, &key.buf, &key_is_vl, &key_size) < 0) HGOTO_ERROR_FF(FAIL, "data type conversion failed"); @@ -810,7 +814,7 @@ H5VL_iod_server_map_get_cb(AXE_engine_t UNUSED axe_engine, } /* adjust buffers for datatype conversion */ - if(H5VL__iod_server_adjust_buffer(val_memtype_id, val_maptype_id, 1, dxpl_id, + if(H5VL__iod_server_adjust_buffer(val_memtype_id, val_maptype_id, 1, dxpl_id, is_coresident, (size_t)src_size, &val_buf, &val_is_vl, &val_size) < 0) HGOTO_ERROR_FF(FAIL, "data type conversion failed"); @@ -1010,7 +1014,7 @@ H5VL_iod_server_map_exists_cb(AXE_engine_t UNUSED axe_engine, } /* adjust buffers for datatype conversion */ - if(H5VL__iod_server_adjust_buffer(key_memtype_id, key_maptype_id, 1, H5P_DEFAULT, + if(H5VL__iod_server_adjust_buffer(key_memtype_id, key_maptype_id, 1, H5P_DEFAULT, 0, key.buf_size, &key.buf, &is_vl_data, &key_size) < 0) HGOTO_ERROR_FF(FAIL, "data type conversion failed"); @@ -1101,7 +1105,7 @@ H5VL_iod_server_map_delete_cb(AXE_engine_t UNUSED axe_engine, } /* adjust buffers for datatype conversion */ - if(H5VL__iod_server_adjust_buffer(key_memtype_id, key_maptype_id, 1, H5P_DEFAULT, + if(H5VL__iod_server_adjust_buffer(key_memtype_id, key_maptype_id, 1, H5P_DEFAULT, 0, key.buf_size, &key.buf, &is_vl_data, &key_size) < 0) HGOTO_ERROR_FF(FAIL, "data type conversion failed"); diff --git a/src/H5VLiod_server.c b/src/H5VLiod_server.c index 2dd16c4..ce6a84b 100644 --- a/src/H5VLiod_server.c +++ b/src/H5VLiod_server.c @@ -382,7 +382,7 @@ EFF_setup_coresident(MPI_Comm comm, MPI_Info UNUSED info) return FAIL; /* Set number of threads in AXE engine */ - if(AXEset_num_threads(&engine_attr, 16) != AXE_SUCCEED) + if(AXEset_num_threads(&engine_attr, 2) != AXE_SUCCEED) return FAIL; /* Create AXE engine */ diff --git a/src/H5VLiod_server.h b/src/H5VLiod_server.h index f427bac..afadffb 100644 --- a/src/H5VLiod_server.h +++ b/src/H5VLiod_server.h @@ -404,7 +404,8 @@ H5_DLL herr_t H5VL_iod_insert_new_link(iod_handle_t oh, iod_trans_id_t tid, cons H5_DLL herr_t H5VL_iod_get_metadata(iod_handle_t oh, iod_trans_id_t tid, H5VL_iod_metadata_t md_type, const char *key, uint32_t cs_scope, iod_event_t *event, void *ret); H5_DLL herr_t H5VL__iod_server_adjust_buffer(hid_t from_type_id, hid_t to_type_id, size_t nelmts, - hid_t dxpl_id, size_t size, void **buf, + hid_t dxpl_id, na_bool_t is_coresident, + size_t size, void **buf, hbool_t *is_vl_data, size_t *_buf_size); H5_DLL herr_t H5VL_iod_verify_scratch_pad(scratch_pad *sp, iod_checksum_t iod_cs); H5_DLL herr_t H5VL_iod_verify_kv_pair(void *key, iod_size_t key_size, void *value, iod_size_t val_size, diff --git a/src/H5VLiod_util.c b/src/H5VLiod_util.c index 58d205e..b5060c2 100644 --- a/src/H5VLiod_util.c +++ b/src/H5VLiod_util.c @@ -1141,7 +1141,7 @@ done: */ herr_t H5VL__iod_server_adjust_buffer(hid_t mem_type_id, hid_t dset_type_id, size_t nelmts, - hid_t UNUSED dxpl_id, size_t size, void **buf, + hid_t UNUSED dxpl_id, na_bool_t is_coresident, size_t size, void **buf, hbool_t *is_vl_data, size_t *_buf_size) { herr_t ret_value = SUCCEED; @@ -1178,8 +1178,21 @@ H5VL__iod_server_adjust_buffer(hid_t mem_type_id, hid_t dset_type_id, size_t nel if(mem_type_size < dset_type_size) { buf_size = dset_type_size * nelmts; - if(NULL == (*buf = realloc(*buf, (size_t)buf_size))) - HGOTO_ERROR_FF(FAIL, "Can't adjust buffer for DT conversion"); + /* if we are coresident, and buffer extension is + required, make a new buffer so we don't mess + with the user buffer. */ + if(is_coresident) { + void *new_buf = NULL; + + if(NULL == (new_buf = malloc((size_t)buf_size))) + HGOTO_ERROR_FF(FAIL, "Can't malloc new buffer for DT conversion"); + memcpy(new_buf, *buf, *_buf_size); + *buf = new_buf; + } + else { + if(NULL == (*buf = realloc(*buf, (size_t)buf_size))) + HGOTO_ERROR_FF(FAIL, "Can't adjust buffer for DT conversion"); + } #if H5_EFF_DEBUG fprintf(stderr, "Adjusted Buffer size for dt conversion from %zu to %lld\n", size, buf_size); -- cgit v0.12