diff options
author | Mohamad Chaarawi <chaarawi@hdfgroup.org> | 2013-09-21 22:39:49 (GMT) |
---|---|---|
committer | Mohamad Chaarawi <chaarawi@hdfgroup.org> | 2013-09-21 22:39:49 (GMT) |
commit | 2a521de12e640841062bb1525d41bedf59ddb41d (patch) | |
tree | 8118d2148b79029c3d7a8fb9d806d25adedce783 /src/H5VLiod_map.c | |
parent | 1f72f24e93870b28767b7570e1063229f87588d5 (diff) | |
download | hdf5-2a521de12e640841062bb1525d41bedf59ddb41d.zip hdf5-2a521de12e640841062bb1525d41bedf59ddb41d.tar.gz hdf5-2a521de12e640841062bb1525d41bedf59ddb41d.tar.bz2 |
[svn-r24184] Use bulk data transfer for Values.
Diffstat (limited to 'src/H5VLiod_map.c')
-rw-r--r-- | src/H5VLiod_map.c | 167 |
1 files changed, 146 insertions, 21 deletions
diff --git a/src/H5VLiod_map.c b/src/H5VLiod_map.c index 7bd9141..2992c31 100644 --- a/src/H5VLiod_map.c +++ b/src/H5VLiod_map.c @@ -337,15 +337,21 @@ H5VL_iod_server_map_set_cb(AXE_engine_t UNUSED axe_engine, hid_t key_maptype_id = input->key_maptype_id; hid_t val_maptype_id = input->val_maptype_id; binary_buf_t key = input->key; - binary_buf_t val = input->val; + hg_bulk_t value_handle = input->val_handle; /* bulk handle for data */ + uint32_t value_cs = input->val_checksum; /* checksum recieved for data */ hid_t dxpl_id = input->dxpl_id; iod_trans_id_t wtid = input->trans_num; iod_trans_id_t rtid = input->rcxt_num; uint32_t cs_scope = input->cs_scope; - size_t key_size, val_size; + na_addr_t source = HG_Handler_get_addr(op_data->hg_handle); /* source address to pull data from */ + hg_bulk_block_t bulk_block_handle; /* HG block handle */ + hg_bulk_request_t bulk_request; /* HG request */ + size_t key_size, val_size, new_val_size; + void *val_buf = NULL; iod_kv_t kv; + uint32_t raw_cs_scope; hbool_t opened_locally = FALSE; - hbool_t is_vl_data = FALSE; + hbool_t val_is_vl_data = FALSE, key_is_vl_data = FALSE; herr_t ret_value = SUCCEED; FUNC_ENTER_NOAPI_NOINIT @@ -361,20 +367,67 @@ H5VL_iod_server_map_set_cb(AXE_engine_t UNUSED axe_engine, opened_locally = TRUE; } + if(H5P_DEFAULT == input->dxpl_id) + input->dxpl_id = H5Pcopy(H5P_DATASET_XFER_DEFAULT); + dxpl_id = input->dxpl_id; + + /* retrieve size of incoming bulk data */ + val_size = HG_Bulk_handle_get_size(value_handle); + + /* allocate buffer to hold data */ + if(NULL == (val_buf = malloc(val_size))) + HGOTO_ERROR(H5E_SYM, H5E_NOSPACE, FAIL, "can't allocate read buffer"); + + /* create a Mercury block handle for transfer */ + HG_Bulk_block_handle_create(val_buf, val_size, HG_BULK_READWRITE, &bulk_block_handle); + + /* Write bulk data here and wait for the data to be there */ + if(HG_SUCCESS != HG_Bulk_read_all(source, value_handle, bulk_block_handle, &bulk_request)) + HGOTO_ERROR(H5E_SYM, H5E_WRITEERROR, FAIL, "can't get data from function shipper"); + /* wait for it to complete */ + if(HG_SUCCESS != HG_Bulk_wait(bulk_request, HG_MAX_IDLE_TIME, HG_STATUS_IGNORE)) + HGOTO_ERROR(H5E_SYM, H5E_WRITEERROR, FAIL, "can't get data from function shipper"); + + /* free the bds block handle */ + if(HG_SUCCESS != HG_Bulk_block_handle_free(bulk_block_handle)) + HGOTO_ERROR(H5E_SYM, H5E_WRITEERROR, FAIL, "can't free bds block handle"); + + /* get the scope for data integrity checks for raw data */ + if(H5Pget_rawdata_integrity_scope(dxpl_id, &raw_cs_scope) < 0) + HGOTO_ERROR(H5E_PLIST, H5E_CANTGET, FAIL, "can't get scope for data integrity checks"); + + /* verify data if transfer flag is set */ + if(raw_cs_scope & H5_CHECKSUM_TRANSFER) { + uint32_t data_cs; + + data_cs = H5checksum(val_buf, val_size, NULL); + if(value_cs != data_cs) { + fprintf(stderr, "Errrr.. Network transfer Data corruption. expecting %u, got %u\n", + value_cs, data_cs); + ret_value = FAIL; + goto done; + } + } +#if H5VL_IOD_DEBUG + else { + fprintf(stderr, "NO TRANSFER DATA INTEGRITY CHECKS ON RAW DATA\n"); + } +#endif + /* adjust buffers for datatype conversion */ if(H5VL__iod_server_adjust_buffer(key_memtype_id, key_maptype_id, 1, dxpl_id, - key.buf_size, &key.buf, &is_vl_data, &key_size) < 0) + key.buf_size, &key.buf, &key_is_vl_data, &key_size) < 0) HGOTO_ERROR(H5E_DATATYPE, H5E_CANTINIT, FAIL, "data type conversion failed"); if(H5VL__iod_server_adjust_buffer(val_memtype_id, val_maptype_id, 1, dxpl_id, - val.buf_size, &val.buf, &is_vl_data, &val_size) < 0) + val_size, &val_buf, &val_is_vl_data, &new_val_size) < 0) HGOTO_ERROR(H5E_DATATYPE, H5E_CANTINIT, FAIL, "data type conversion failed"); /* MSC - fake debugging */ - if(is_vl_data) { + if(val_is_vl_data) { H5T_class_t class; size_t seq_len = val_size, u; - uint8_t *buf_ptr = (uint8_t *)val.buf; + uint8_t *buf_ptr = (uint8_t *)val_buf; class = H5Tget_class(val_memtype_id); @@ -389,10 +442,24 @@ H5VL_iod_server_map_set_cb(AXE_engine_t UNUSED axe_engine, fprintf(stderr, "\n"); } } + else { + fprintf(stderr, "Map Set value = %d; size = %zu\n", *((int *)val_buf), val_size); + } + + if(!key_is_vl_data) { + /* convert data if needed */ + if(H5Tconvert(key_memtype_id, key_maptype_id, 1, key.buf, NULL, dxpl_id) < 0) + HGOTO_ERROR(H5E_DATATYPE, H5E_CANTINIT, FAIL, "data type conversion failed"); + } + if(!val_is_vl_data) { + if(H5Tconvert(val_memtype_id, val_maptype_id, 1, val_buf, NULL, dxpl_id) < 0) + HGOTO_ERROR(H5E_DATATYPE, H5E_CANTINIT, FAIL, "data type conversion failed"); + } + /* MSC - do IOD checksum - can't now */ kv.key = key.buf; - kv.value = val.buf; - kv.value_len = (iod_size_t)val_size; + kv.value = val_buf; + kv.value_len = (iod_size_t)new_val_size; /* insert kv pair into MAP */ if (iod_kv_set(iod_oh, wtid, NULL, &kv, NULL, NULL) < 0) HGOTO_ERROR(H5E_SYM, H5E_CANTINIT, FAIL, "can't set KV pair in parent"); @@ -405,6 +472,9 @@ done: input = (map_set_in_t *)H5MM_xfree(input); op_data = (op_data_t *)H5MM_xfree(op_data); + if(val_buf) + free(val_buf); + /* close the map if we opened it in this routine */ if(opened_locally) { if(iod_obj_close(iod_oh, NULL, NULL)) @@ -454,10 +524,15 @@ H5VL_iod_server_map_get_cb(AXE_engine_t UNUSED axe_engine, uint32_t cs_scope = input->cs_scope; hbool_t val_is_vl = input->val_is_vl; size_t client_val_buf_size = input->val_size; + hg_bulk_t value_handle = input->val_handle; /* bulk handle for data */ + uint32_t raw_cs_scope; hbool_t key_is_vl = FALSE; size_t key_size, val_size; iod_size_t src_size; void *val_buf = NULL; + hg_bulk_block_t bulk_block_handle; /* HG block handle */ + hg_bulk_request_t bulk_request; /* HG request */ + na_addr_t dest = HG_Handler_get_addr(op_data->hg_handle); /* destination address to push data to */ hbool_t opened_locally = FALSE; herr_t ret_value = SUCCEED; @@ -474,6 +549,14 @@ H5VL_iod_server_map_get_cb(AXE_engine_t UNUSED axe_engine, opened_locally = TRUE; } + if(H5P_DEFAULT == input->dxpl_id) + input->dxpl_id = H5Pcopy(H5P_DATASET_XFER_DEFAULT); + dxpl_id = input->dxpl_id; + + /* get the scope for data integrity checks for raw data */ + if(H5Pget_rawdata_integrity_scope(dxpl_id, &raw_cs_scope) < 0) + HGOTO_ERROR(H5E_PLIST, H5E_CANTGET, FAIL, "can't get scope for data integrity checks"); + /* adjust buffers for datatype conversion */ if(H5VL__iod_server_adjust_buffer(key_memtype_id, key_maptype_id, 1, dxpl_id, key.buf_size, &key.buf, &key_is_vl, &key_size) < 0) @@ -555,18 +638,34 @@ H5VL_iod_server_map_get_cb(AXE_engine_t UNUSED axe_engine, output.ret = ret_value; output.val_size = src_size; - if(client_val_buf_size) { - output.val.val_size = src_size; - output.val.val = val_buf; + if(raw_cs_scope) { + /* calculate a checksum for the data to be sent */ + output.val_cs = H5checksum(val_buf, (size_t)src_size, NULL); } +#if H5VL_IOD_DEBUG else { - output.val.val_size = 0; - output.val.val = NULL; + fprintf(stderr, "NO TRANSFER DATA INTEGRITY CHECKS ON RAW DATA\n"); + } +#endif + if(client_val_buf_size) { + /* Create a new block handle to write the data */ + HG_Bulk_block_handle_create(val_buf, (size_t)src_size, HG_BULK_READ_ONLY, &bulk_block_handle); + + /* Write bulk data here and wait for the data to be there */ + if(HG_SUCCESS != HG_Bulk_write_all(dest, value_handle, bulk_block_handle, &bulk_request)) + HGOTO_ERROR(H5E_SYM, H5E_READERROR, FAIL, "can't read from array object"); + /* wait for it to complete */ + if(HG_SUCCESS != HG_Bulk_wait(bulk_request, HG_MAX_IDLE_TIME, HG_STATUS_IGNORE)) + HGOTO_ERROR(H5E_SYM, H5E_READERROR, FAIL, "can't read from array object"); + + /* free block handle */ + if(HG_SUCCESS != HG_Bulk_block_handle_free(bulk_block_handle)) + HGOTO_ERROR(H5E_SYM, H5E_READERROR, FAIL, "can't free bds block handle"); } } else { - /* MSC - fake something */ - src_size = 4; + /* retrieve size of bulk data asked for to be read */ + src_size = HG_Bulk_handle_get_size(value_handle); if(NULL == (val_buf = malloc((size_t)src_size))) HGOTO_ERROR(H5E_SYM, H5E_NOSPACE, FAIL, "can't allocate buffer"); @@ -580,13 +679,39 @@ H5VL_iod_server_map_get_cb(AXE_engine_t UNUSED axe_engine, (size_t)src_size, &val_buf, &val_is_vl, &val_size) < 0) HGOTO_ERROR(H5E_DATATYPE, H5E_CANTINIT, FAIL, "data type conversion failed"); + /* do data conversion */ + if(H5Tconvert(val_maptype_id, val_memtype_id, 1, val_buf, NULL, dxpl_id) < 0) + HGOTO_ERROR(H5E_DATATYPE, H5E_CANTINIT, FAIL, "data type conversion failed"); + /* MSC - fake something */ *((int *)val_buf) = 1024; - output.val_size = src_size; - output.val.val_size = val_size; - output.val.val = val_buf; + if(raw_cs_scope) { + /* calculate a checksum for the data to be sent */ + output.val_cs = H5checksum(val_buf, val_size, NULL); + } +#if H5VL_IOD_DEBUG + else { + fprintf(stderr, "NO TRANSFER DATA INTEGRITY CHECKS ON RAW DATA\n"); + } +#endif + + output.val_size = val_size; output.ret = ret_value; + + /* Create a new block handle to write the data */ + HG_Bulk_block_handle_create(val_buf, val_size, HG_BULK_READ_ONLY, &bulk_block_handle); + + /* Write bulk data here and wait for the data to be there */ + if(HG_SUCCESS != HG_Bulk_write_all(dest, value_handle, bulk_block_handle, &bulk_request)) + HGOTO_ERROR(H5E_SYM, H5E_READERROR, FAIL, "can't read from array object"); + /* wait for it to complete */ + if(HG_SUCCESS != HG_Bulk_wait(bulk_request, HG_MAX_IDLE_TIME, HG_STATUS_IGNORE)) + HGOTO_ERROR(H5E_SYM, H5E_READERROR, FAIL, "can't read from array object"); + + /* free block handle */ + if(HG_SUCCESS != HG_Bulk_block_handle_free(bulk_block_handle)) + HGOTO_ERROR(H5E_SYM, H5E_READERROR, FAIL, "can't free bds block handle"); } #if H5VL_IOD_DEBUG @@ -600,8 +725,8 @@ done: if(ret_value < 0) { output.ret = FAIL; - output.val.val_size = 0; - output.val.val = NULL; + output.val_size = 0; + output.val_cs = 0; if(HG_SUCCESS != HG_Handler_start_output(op_data->hg_handle, &output)) HDONE_ERROR(H5E_SYM, H5E_CANTGET, FAIL, "can't send result of map get"); } |