summaryrefslogtreecommitdiffstats
path: root/src/H5VLiod_map.c
diff options
context:
space:
mode:
authorMohamad Chaarawi <chaarawi@hdfgroup.org>2013-09-21 22:39:49 (GMT)
committerMohamad Chaarawi <chaarawi@hdfgroup.org>2013-09-21 22:39:49 (GMT)
commit2a521de12e640841062bb1525d41bedf59ddb41d (patch)
tree8118d2148b79029c3d7a8fb9d806d25adedce783 /src/H5VLiod_map.c
parent1f72f24e93870b28767b7570e1063229f87588d5 (diff)
downloadhdf5-2a521de12e640841062bb1525d41bedf59ddb41d.zip
hdf5-2a521de12e640841062bb1525d41bedf59ddb41d.tar.gz
hdf5-2a521de12e640841062bb1525d41bedf59ddb41d.tar.bz2
[svn-r24184] Use bulk data transfer for Values.
Diffstat (limited to 'src/H5VLiod_map.c')
-rw-r--r--src/H5VLiod_map.c167
1 files changed, 146 insertions, 21 deletions
diff --git a/src/H5VLiod_map.c b/src/H5VLiod_map.c
index 7bd9141..2992c31 100644
--- a/src/H5VLiod_map.c
+++ b/src/H5VLiod_map.c
@@ -337,15 +337,21 @@ H5VL_iod_server_map_set_cb(AXE_engine_t UNUSED axe_engine,
hid_t key_maptype_id = input->key_maptype_id;
hid_t val_maptype_id = input->val_maptype_id;
binary_buf_t key = input->key;
- binary_buf_t val = input->val;
+ hg_bulk_t value_handle = input->val_handle; /* bulk handle for data */
+ uint32_t value_cs = input->val_checksum; /* checksum recieved for data */
hid_t dxpl_id = input->dxpl_id;
iod_trans_id_t wtid = input->trans_num;
iod_trans_id_t rtid = input->rcxt_num;
uint32_t cs_scope = input->cs_scope;
- size_t key_size, val_size;
+ na_addr_t source = HG_Handler_get_addr(op_data->hg_handle); /* source address to pull data from */
+ hg_bulk_block_t bulk_block_handle; /* HG block handle */
+ hg_bulk_request_t bulk_request; /* HG request */
+ size_t key_size, val_size, new_val_size;
+ void *val_buf = NULL;
iod_kv_t kv;
+ uint32_t raw_cs_scope;
hbool_t opened_locally = FALSE;
- hbool_t is_vl_data = FALSE;
+ hbool_t val_is_vl_data = FALSE, key_is_vl_data = FALSE;
herr_t ret_value = SUCCEED;
FUNC_ENTER_NOAPI_NOINIT
@@ -361,20 +367,67 @@ H5VL_iod_server_map_set_cb(AXE_engine_t UNUSED axe_engine,
opened_locally = TRUE;
}
+ if(H5P_DEFAULT == input->dxpl_id)
+ input->dxpl_id = H5Pcopy(H5P_DATASET_XFER_DEFAULT);
+ dxpl_id = input->dxpl_id;
+
+ /* retrieve size of incoming bulk data */
+ val_size = HG_Bulk_handle_get_size(value_handle);
+
+ /* allocate buffer to hold data */
+ if(NULL == (val_buf = malloc(val_size)))
+ HGOTO_ERROR(H5E_SYM, H5E_NOSPACE, FAIL, "can't allocate read buffer");
+
+ /* create a Mercury block handle for transfer */
+ HG_Bulk_block_handle_create(val_buf, val_size, HG_BULK_READWRITE, &bulk_block_handle);
+
+ /* Write bulk data here and wait for the data to be there */
+ if(HG_SUCCESS != HG_Bulk_read_all(source, value_handle, bulk_block_handle, &bulk_request))
+ HGOTO_ERROR(H5E_SYM, H5E_WRITEERROR, FAIL, "can't get data from function shipper");
+ /* wait for it to complete */
+ if(HG_SUCCESS != HG_Bulk_wait(bulk_request, HG_MAX_IDLE_TIME, HG_STATUS_IGNORE))
+ HGOTO_ERROR(H5E_SYM, H5E_WRITEERROR, FAIL, "can't get data from function shipper");
+
+ /* free the bds block handle */
+ if(HG_SUCCESS != HG_Bulk_block_handle_free(bulk_block_handle))
+ HGOTO_ERROR(H5E_SYM, H5E_WRITEERROR, FAIL, "can't free bds block handle");
+
+ /* get the scope for data integrity checks for raw data */
+ if(H5Pget_rawdata_integrity_scope(dxpl_id, &raw_cs_scope) < 0)
+ HGOTO_ERROR(H5E_PLIST, H5E_CANTGET, FAIL, "can't get scope for data integrity checks");
+
+ /* verify data if transfer flag is set */
+ if(raw_cs_scope & H5_CHECKSUM_TRANSFER) {
+ uint32_t data_cs;
+
+ data_cs = H5checksum(val_buf, val_size, NULL);
+ if(value_cs != data_cs) {
+ fprintf(stderr, "Errrr.. Network transfer Data corruption. expecting %u, got %u\n",
+ value_cs, data_cs);
+ ret_value = FAIL;
+ goto done;
+ }
+ }
+#if H5VL_IOD_DEBUG
+ else {
+ fprintf(stderr, "NO TRANSFER DATA INTEGRITY CHECKS ON RAW DATA\n");
+ }
+#endif
+
/* adjust buffers for datatype conversion */
if(H5VL__iod_server_adjust_buffer(key_memtype_id, key_maptype_id, 1, dxpl_id,
- key.buf_size, &key.buf, &is_vl_data, &key_size) < 0)
+ key.buf_size, &key.buf, &key_is_vl_data, &key_size) < 0)
HGOTO_ERROR(H5E_DATATYPE, H5E_CANTINIT, FAIL, "data type conversion failed");
if(H5VL__iod_server_adjust_buffer(val_memtype_id, val_maptype_id, 1, dxpl_id,
- val.buf_size, &val.buf, &is_vl_data, &val_size) < 0)
+ val_size, &val_buf, &val_is_vl_data, &new_val_size) < 0)
HGOTO_ERROR(H5E_DATATYPE, H5E_CANTINIT, FAIL, "data type conversion failed");
/* MSC - fake debugging */
- if(is_vl_data) {
+ if(val_is_vl_data) {
H5T_class_t class;
size_t seq_len = val_size, u;
- uint8_t *buf_ptr = (uint8_t *)val.buf;
+ uint8_t *buf_ptr = (uint8_t *)val_buf;
class = H5Tget_class(val_memtype_id);
@@ -389,10 +442,24 @@ H5VL_iod_server_map_set_cb(AXE_engine_t UNUSED axe_engine,
fprintf(stderr, "\n");
}
}
+ else {
+ fprintf(stderr, "Map Set value = %d; size = %zu\n", *((int *)val_buf), val_size);
+ }
+
+ if(!key_is_vl_data) {
+ /* convert data if needed */
+ if(H5Tconvert(key_memtype_id, key_maptype_id, 1, key.buf, NULL, dxpl_id) < 0)
+ HGOTO_ERROR(H5E_DATATYPE, H5E_CANTINIT, FAIL, "data type conversion failed");
+ }
+ if(!val_is_vl_data) {
+ if(H5Tconvert(val_memtype_id, val_maptype_id, 1, val_buf, NULL, dxpl_id) < 0)
+ HGOTO_ERROR(H5E_DATATYPE, H5E_CANTINIT, FAIL, "data type conversion failed");
+ }
+ /* MSC - do IOD checksum - can't now */
kv.key = key.buf;
- kv.value = val.buf;
- kv.value_len = (iod_size_t)val_size;
+ kv.value = val_buf;
+ kv.value_len = (iod_size_t)new_val_size;
/* insert kv pair into MAP */
if (iod_kv_set(iod_oh, wtid, NULL, &kv, NULL, NULL) < 0)
HGOTO_ERROR(H5E_SYM, H5E_CANTINIT, FAIL, "can't set KV pair in parent");
@@ -405,6 +472,9 @@ done:
input = (map_set_in_t *)H5MM_xfree(input);
op_data = (op_data_t *)H5MM_xfree(op_data);
+ if(val_buf)
+ free(val_buf);
+
/* close the map if we opened it in this routine */
if(opened_locally) {
if(iod_obj_close(iod_oh, NULL, NULL))
@@ -454,10 +524,15 @@ H5VL_iod_server_map_get_cb(AXE_engine_t UNUSED axe_engine,
uint32_t cs_scope = input->cs_scope;
hbool_t val_is_vl = input->val_is_vl;
size_t client_val_buf_size = input->val_size;
+ hg_bulk_t value_handle = input->val_handle; /* bulk handle for data */
+ uint32_t raw_cs_scope;
hbool_t key_is_vl = FALSE;
size_t key_size, val_size;
iod_size_t src_size;
void *val_buf = NULL;
+ hg_bulk_block_t bulk_block_handle; /* HG block handle */
+ hg_bulk_request_t bulk_request; /* HG request */
+ na_addr_t dest = HG_Handler_get_addr(op_data->hg_handle); /* destination address to push data to */
hbool_t opened_locally = FALSE;
herr_t ret_value = SUCCEED;
@@ -474,6 +549,14 @@ H5VL_iod_server_map_get_cb(AXE_engine_t UNUSED axe_engine,
opened_locally = TRUE;
}
+ if(H5P_DEFAULT == input->dxpl_id)
+ input->dxpl_id = H5Pcopy(H5P_DATASET_XFER_DEFAULT);
+ dxpl_id = input->dxpl_id;
+
+ /* get the scope for data integrity checks for raw data */
+ if(H5Pget_rawdata_integrity_scope(dxpl_id, &raw_cs_scope) < 0)
+ HGOTO_ERROR(H5E_PLIST, H5E_CANTGET, FAIL, "can't get scope for data integrity checks");
+
/* adjust buffers for datatype conversion */
if(H5VL__iod_server_adjust_buffer(key_memtype_id, key_maptype_id, 1, dxpl_id,
key.buf_size, &key.buf, &key_is_vl, &key_size) < 0)
@@ -555,18 +638,34 @@ H5VL_iod_server_map_get_cb(AXE_engine_t UNUSED axe_engine,
output.ret = ret_value;
output.val_size = src_size;
- if(client_val_buf_size) {
- output.val.val_size = src_size;
- output.val.val = val_buf;
+ if(raw_cs_scope) {
+ /* calculate a checksum for the data to be sent */
+ output.val_cs = H5checksum(val_buf, (size_t)src_size, NULL);
}
+#if H5VL_IOD_DEBUG
else {
- output.val.val_size = 0;
- output.val.val = NULL;
+ fprintf(stderr, "NO TRANSFER DATA INTEGRITY CHECKS ON RAW DATA\n");
+ }
+#endif
+ if(client_val_buf_size) {
+ /* Create a new block handle to write the data */
+ HG_Bulk_block_handle_create(val_buf, (size_t)src_size, HG_BULK_READ_ONLY, &bulk_block_handle);
+
+ /* Write bulk data here and wait for the data to be there */
+ if(HG_SUCCESS != HG_Bulk_write_all(dest, value_handle, bulk_block_handle, &bulk_request))
+ HGOTO_ERROR(H5E_SYM, H5E_READERROR, FAIL, "can't read from array object");
+ /* wait for it to complete */
+ if(HG_SUCCESS != HG_Bulk_wait(bulk_request, HG_MAX_IDLE_TIME, HG_STATUS_IGNORE))
+ HGOTO_ERROR(H5E_SYM, H5E_READERROR, FAIL, "can't read from array object");
+
+ /* free block handle */
+ if(HG_SUCCESS != HG_Bulk_block_handle_free(bulk_block_handle))
+ HGOTO_ERROR(H5E_SYM, H5E_READERROR, FAIL, "can't free bds block handle");
}
}
else {
- /* MSC - fake something */
- src_size = 4;
+ /* retrieve size of bulk data asked for to be read */
+ src_size = HG_Bulk_handle_get_size(value_handle);
if(NULL == (val_buf = malloc((size_t)src_size)))
HGOTO_ERROR(H5E_SYM, H5E_NOSPACE, FAIL, "can't allocate buffer");
@@ -580,13 +679,39 @@ H5VL_iod_server_map_get_cb(AXE_engine_t UNUSED axe_engine,
(size_t)src_size, &val_buf, &val_is_vl, &val_size) < 0)
HGOTO_ERROR(H5E_DATATYPE, H5E_CANTINIT, FAIL, "data type conversion failed");
+ /* do data conversion */
+ if(H5Tconvert(val_maptype_id, val_memtype_id, 1, val_buf, NULL, dxpl_id) < 0)
+ HGOTO_ERROR(H5E_DATATYPE, H5E_CANTINIT, FAIL, "data type conversion failed");
+
/* MSC - fake something */
*((int *)val_buf) = 1024;
- output.val_size = src_size;
- output.val.val_size = val_size;
- output.val.val = val_buf;
+ if(raw_cs_scope) {
+ /* calculate a checksum for the data to be sent */
+ output.val_cs = H5checksum(val_buf, val_size, NULL);
+ }
+#if H5VL_IOD_DEBUG
+ else {
+ fprintf(stderr, "NO TRANSFER DATA INTEGRITY CHECKS ON RAW DATA\n");
+ }
+#endif
+
+ output.val_size = val_size;
output.ret = ret_value;
+
+ /* Create a new block handle to write the data */
+ HG_Bulk_block_handle_create(val_buf, val_size, HG_BULK_READ_ONLY, &bulk_block_handle);
+
+ /* Write bulk data here and wait for the data to be there */
+ if(HG_SUCCESS != HG_Bulk_write_all(dest, value_handle, bulk_block_handle, &bulk_request))
+ HGOTO_ERROR(H5E_SYM, H5E_READERROR, FAIL, "can't read from array object");
+ /* wait for it to complete */
+ if(HG_SUCCESS != HG_Bulk_wait(bulk_request, HG_MAX_IDLE_TIME, HG_STATUS_IGNORE))
+ HGOTO_ERROR(H5E_SYM, H5E_READERROR, FAIL, "can't read from array object");
+
+ /* free block handle */
+ if(HG_SUCCESS != HG_Bulk_block_handle_free(bulk_block_handle))
+ HGOTO_ERROR(H5E_SYM, H5E_READERROR, FAIL, "can't free bds block handle");
}
#if H5VL_IOD_DEBUG
@@ -600,8 +725,8 @@ done:
if(ret_value < 0) {
output.ret = FAIL;
- output.val.val_size = 0;
- output.val.val = NULL;
+ output.val_size = 0;
+ output.val_cs = 0;
if(HG_SUCCESS != HG_Handler_start_output(op_data->hg_handle, &output))
HDONE_ERROR(H5E_SYM, H5E_CANTGET, FAIL, "can't send result of map get");
}