summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVishwanath Venkatesan <vish@hdfgroup.org>2013-08-02 01:19:39 (GMT)
committerVishwanath Venkatesan <vish@hdfgroup.org>2013-08-02 01:19:39 (GMT)
commit66ce66962435a99717866cde54d7c5a75b68e1cf (patch)
tree0bb15020a237af0a5b0a3020981adfc5cd6b4291
parent38c702e3301c49e3de725d24be524aca38c6774d (diff)
downloadhdf5-66ce66962435a99717866cde54d7c5a75b68e1cf.zip
hdf5-66ce66962435a99717866cde54d7c5a75b68e1cf.tar.gz
hdf5-66ce66962435a99717866cde54d7c5a75b68e1cf.tar.bz2
[svn-r23962] 1. Compactor reads works
2. Native write fixes 3. Short-circuited writes implemented. Have to be tested tomorrow.
-rw-r--r--src/H5VLiod_compactor.c180
-rw-r--r--src/H5VLiod_compactor.h4
-rw-r--r--src/H5VLiod_compactor_queue.h5
-rw-r--r--src/H5VLiod_dset.c306
-rw-r--r--src/H5VLiod_server.c9
-rw-r--r--src/H5VLiod_server.h2
6 files changed, 459 insertions, 47 deletions
diff --git a/src/H5VLiod_compactor.c b/src/H5VLiod_compactor.c
index f6247ac..cfefb7a 100644
--- a/src/H5VLiod_compactor.c
+++ b/src/H5VLiod_compactor.c
@@ -91,7 +91,10 @@ static void H5VL_print_block_container (block_container_t *cont,
static int H5VL_check_overlapped_offsets(hsize_t start_i, hsize_t start_j,
hsize_t end_i, hsize_t end_j);
-
+
+static int H5VL_get_read_spread (hsize_t start_offset, hsize_t end_offset,
+ hsize_t *start_offsets, hsize_t *end_offsets,
+ int **requests, int *nreqs);
/*---------------------------------------------------------------------*/
@@ -304,14 +307,13 @@ int H5VL_iod_create_request_list (compactor *queue, request_list_t **list,
/*from the second id!*/
else{
-#if H5_DO_NATIVE
+ /*The same dataset could be opened,
+ in which case the ids can be different*/
H5Iget_name (current_dset, dname, 257);
H5Iget_name (dataset_id, dname1, 257);
+
if ( (current_dset == (hid_t)dataset_id) ||
(!(strcmp(dname, dname1))) ) {
-#else
- if (current_dset == (hid_t)dataset_id){
-#endif
#if DEBUG_COMPACTOR
fprintf(stderr, "in %s:%d current dataset: %d has the request %d at %d \n",
@@ -392,20 +394,18 @@ int H5VL_iod_create_request_list (compactor *queue, request_list_t **list,
if(NULL == (buf = malloc(buf_size)))
HGOTO_ERROR(H5E_HEAP, H5E_NOSPACE, CP_FAIL, "can't allocate read buffer");
- HG_Bulk_block_handle_create(buf, size, HG_BULK_READWRITE, &bulk_block_handle);
-
- /* Write bulk data here and wait for the data to be there */
- if(HG_SUCCESS != HG_Bulk_read_all(source, bulk_handle, bulk_block_handle, &bulk_request))
- HGOTO_ERROR(H5E_SYM, H5E_WRITEERROR, CP_FAIL, "can't get data from function shipper");
-
- /* wait for it to complete */
- if(HG_SUCCESS != HG_Bulk_wait(bulk_request, HG_MAX_IDLE_TIME, HG_STATUS_IGNORE))
- HGOTO_ERROR(H5E_SYM, H5E_WRITEERROR, CP_FAIL, "can't get data from function shipper");
-
- /* free the bds block handle */
- if(HG_SUCCESS != HG_Bulk_block_handle_free(bulk_block_handle))
- HGOTO_ERROR(H5E_SYM, H5E_WRITEERROR, CP_FAIL, "can't free bds block handle");
-
+ if (request_type == WRITE){
+ HG_Bulk_block_handle_create(buf, size, HG_BULK_READWRITE, &bulk_block_handle);
+ /* Write bulk data here and wait for the data to be there */
+ if(HG_SUCCESS != HG_Bulk_read_all(source, bulk_handle, bulk_block_handle, &bulk_request))
+ HGOTO_ERROR(H5E_SYM, H5E_WRITEERROR, CP_FAIL, "can't get data from function shipper");
+ /* wait for it to complete */
+ if(HG_SUCCESS != HG_Bulk_wait(bulk_request, HG_MAX_IDLE_TIME, HG_STATUS_IGNORE))
+ HGOTO_ERROR(H5E_SYM, H5E_WRITEERROR, CP_FAIL, "can't get data from function shipper");
+ /* free the bds block handle */
+ if(HG_SUCCESS != HG_Bulk_block_handle_free(bulk_block_handle))
+ HGOTO_ERROR(H5E_SYM, H5E_WRITEERROR, CP_FAIL, "can't free bds block handle");
+ }
/***********************************************************************************/
/* extract offsets and lengths for this dataspace selection*/
@@ -434,7 +434,11 @@ int H5VL_iod_create_request_list (compactor *queue, request_list_t **list,
}
newlist[request_id].request_id = request_id;
- newlist[request_id].merged = NOT_MERGED;
+ if(request_type == WRITE)
+ newlist[request_id].merged = NOT_MERGED;
+ else
+ newlist[request_id].merged = NOT_SS;
+
newlist[request_id].num_fblocks = num_entries;
newlist[request_id].num_mblocks = num_entries;
@@ -515,7 +519,7 @@ int H5VL_iod_create_request_list (compactor *queue, request_list_t **list,
newlist[i].num_peers);
}
-
+
for ( i = 0; i < num_datasets; i++){
fprintf(stderr, "%s:%d dataset: %d has %d requests \n",
__FILE__, __LINE__,
@@ -525,7 +529,7 @@ int H5VL_iod_create_request_list (compactor *queue, request_list_t **list,
fprintf (stderr, "Compactor request %d\n", unique_datasets[i].requests[j]);
}
fprintf(stderr,"\n");
-
+
}
#endif
@@ -537,6 +541,138 @@ int H5VL_iod_create_request_list (compactor *queue, request_list_t **list,
FUNC_LEAVE_NOAPI(ret_value);
} /* end H5VL_iod_create_requests_list */
+/*--------------------------------------------------------------------------
+ * Function: H5VL_iod_short_circuit_reads
+ *
+ * Purpose : Function to check whether current read is avaible in current
+ * writes and returns the list of request that satisfy the
+ * current read
+ *
+ * Return : SUCCESS : 1 (found a matching write)
+ * FAILURE : 0 (no matching write found)
+ *
+ * Programmer : Vishwanath Venkatesan
+ * August, 2013
+ *--------------------------------------------------------------------------
+ */
+
+static
+int H5VL_get_read_spread (hsize_t start_offset, hsize_t end_offset,
+ hsize_t *start_offsets, hsize_t *end_offsets,
+ int **requests, int *nreqs){
+
+ int ret_value = CP_SUCCESS;
+ int i, reqs, *request_list;
+ hsize_t tmp_offset = 0;
+
+
+ FUNC_ENTER_NOAPI(NULL);
+
+ reqs = 0;
+ request_list = (int *) malloc (nentries * sizeof(int));
+ if (NULL == request_list){
+ HGOTO_ERROR(H5E_HEAP, H5E_CANTALLOC, CP_FAIL,"Memory allocation error for request_list");
+ }
+
+ start_offsets = (hsize_t *) malloc (nentries * sizeof(hsize_t));
+ if (NULL == start_offsets){
+ HGOTO_ERROR(H5E_HEAP, H5E_CANTALLOC, CP_FAIL,"Memory allocation error for start_offsets");
+ }
+
+ end_offsets = (hsize_t *) malloc (nentries * sizeof(hsize_t));
+ if (NULL == end_offsets){
+ HGOTO_ERROR(H5E_HEAP, H5E_CANTALLOC, CP_FAIL,"Memory allocation error for start_offsets");
+ }
+
+
+ for ( i = 0; i < nentries; i++){
+ if ((start_offset >= start_offsets[i]) &&
+ (end_offset <= end_offsets[i])){
+ request_list[reqs] = i;
+ reqs++;
+ start_offset = end_offset;
+ break;
+ }
+ if ((start_offset >= start_offsets[i]) &&
+ (end_offset > end_offsets[i])){
+ request_list[reqs] = i;
+ reqs++;
+ start_offset = end_offsets[i] + 1;
+ }
+ if (start_offset < start_offsets[i]){
+ continue;
+ }
+ }
+
+ if (start_offset != end_offset){
+ ret_value = 1;
+ *requests = request_list;
+ *nreqs = reqs;
+ }
+ else{
+ ret_value = 0;
+ }
+
+
+
+ done:
+ FUNC_LEAVE_NOAPI(ret_value);
+
+}
+
+
+/*--------------------------------------------------------------------------
+ * Function: H5VL_iod_short_circuit_reads
+ *
+ * Purpose : Function to check whether reads can be satisfied by
+ * preexisting writes
+ *
+ * Return : SUCCESS : CP_SUCCESS
+ * FAILURE : CP_FAIL
+ *
+ * Programmer : Vishwanath Venkatesan
+ * August, 2013
+ *--------------------------------------------------------------------------
+ */
+
+int H5VL_iod_short_circuit_reads (request_list_t *wlist, int nentries,
+ request_list_t *rlist, int nrentries){
+
+
+ int i, j, nreqs = 0;
+ hsize_t curr_start_offset, curr_end_offset;
+ int *requests = NULL;
+
+
+
+
+ for (i = 0; i < nrentries; i++){
+
+ curr_start_offset = rlist[0].fblocks[0].offset;
+ curr_end_offset = rlist[num_fblocks - 1].offset +
+ rlist[num_fblocks - 1].len - 1;
+
+ if (H5VL_get_read_spread (curr_start_offset,curr_end_offset,
+ start_offsets, end_offsets,
+ &requests, &nreqs) ){
+
+ for (j = 0; j < nreqs; j++){
+ memcpy(rlist[i].mem_buf,
+ wlist[request[j]].mem_buf,
+ wlist[request[j]].mem_len);
+ }
+ rlist[i].merged = SS;
+ }
+
+ }
+
+
+
+}
+
+
+
+
/*-------------------------------------------------------------------------
* Function: H5VL_iod_select_overlap
*
diff --git a/src/H5VLiod_compactor.h b/src/H5VLiod_compactor.h
index 355d537..45ebba2 100644
--- a/src/H5VLiod_compactor.h
+++ b/src/H5VLiod_compactor.h
@@ -57,7 +57,7 @@ typedef struct {
block_container_t *mblocks; /* Memory offset/len */
size_t num_fblocks; /* Number of File blocks */
size_t num_mblocks; /* Number of Memory blocks */
- /*Convenience values!*/
+ /*Convenience def*/
size_t elementsize; /* Size of each element in the dataset */
hid_t dataset_id; /* The ID of the dataset */
hid_t selection_id; /* The ID of the dataspace */
@@ -120,6 +120,8 @@ H5_DLL int H5VL_iod_reconstruct_overlapped_request (block_container_t *sf_block,
int *changed, int *changed_cnt);
H5_DLL int H5VL_iod_free_memory_buffer (request_list_t *list, int num_requests);
+H5_DLL int H5VL_iod_short_circuit_reads (request_list_t *wlist, int nentries,
+ request_list_t *rlist, int nrentries);
/*---------------------------------------------------------------------------------------- */
diff --git a/src/H5VLiod_compactor_queue.h b/src/H5VLiod_compactor_queue.h
index 4057cc8..ac63487 100644
--- a/src/H5VLiod_compactor_queue.h
+++ b/src/H5VLiod_compactor_queue.h
@@ -22,7 +22,10 @@
#define MERGED 297
#define NOT_MERGED 298
#define USED_IN_MERGING 299
-
+/* Read states */
+#define NOT_SS 300;
+#define SS 301
+/* ---------------- */
#define CP_SUCCESS 0
#define CP_FAIL -1
diff --git a/src/H5VLiod_dset.c b/src/H5VLiod_dset.c
index 35f022f..67068c2 100644
--- a/src/H5VLiod_dset.c
+++ b/src/H5VLiod_dset.c
@@ -362,6 +362,9 @@ done:
FUNC_LEAVE_NOAPI_VOID
} /* end H5VL_iod_server_dset_open_cb() */
+
+
+
/*-------------------------------------------------------------------------
* Function: H5VL_iod_server_dset_read_cb
@@ -490,16 +493,16 @@ H5VL_iod_server_dset_read_cb(AXE_engine_t UNUSED axe_engine,
if(NULL == (ret_list = (iod_ret_t *)calloc
(sizeof(iod_ret_t), (size_t)num_descriptors)))
HGOTO_ERROR(H5E_SYM, H5E_NOSPACE, FAIL, "can't allocate iod array");
-
+
/* Set up I/O list */
for(n=0 ; n<num_descriptors ; n++) {
- hsize_t num_bytes = 0;
- hsize_t num_elems = 1;
-
- /* determine how many bytes the current descriptor holds */
- for(i=0 ; i<ndims ; i++)
- num_elems *= (hslabs[n].count[i] * hslabs[n].block[i]);
- num_bytes = num_elems * src_size;
+ hsize_t num_bytes = 0;
+ hsize_t num_elems = 1;
+
+ /* determine how many bytes the current descriptor holds */
+ for(i=0 ; i<ndims ; i++)
+ num_elems *= (hslabs[n].count[i] * hslabs[n].block[i]);
+ num_bytes = num_elems * src_size;
#if 0
/* set the memory descriptor */
@@ -548,7 +551,7 @@ H5VL_iod_server_dset_read_cb(AXE_engine_t UNUSED axe_engine,
#if H5_DO_NATIVE
ret_value = H5Dread(iod_oh.cookie, src_id, H5S_ALL, space_id, dxpl_id, buf);
#else /* fake data */
- for(i=0;i<60;++i)
+ for(i=0;i<64;++i)
ptr[i] = i;
#endif
@@ -570,11 +573,11 @@ H5VL_iod_server_dset_read_cb(AXE_engine_t UNUSED axe_engine,
/* Create a new block handle to write the data */
HG_Bulk_block_handle_create(buf, size, HG_BULK_READ_ONLY, &bulk_block_handle);
-
+
/* Write bulk data here and wait for the data to be there */
if(HG_SUCCESS != HG_Bulk_write_all(dest, bulk_handle, bulk_block_handle, &bulk_request))
HGOTO_ERROR(H5E_SYM, H5E_READERROR, FAIL, "can't read from array object");
- /* wait for it to complete */
+ /* wait for it to complete */
if(HG_SUCCESS != HG_Bulk_wait(bulk_request, HG_MAX_IDLE_TIME, HG_STATUS_IGNORE))
HGOTO_ERROR(H5E_SYM, H5E_READERROR, FAIL, "can't read from array object");
@@ -705,17 +708,27 @@ done:
for ( i = 0; i < ndatasets; i ++){
- H5VL_iod_compact_requests (wlist, &nentries,dlist[i].num_requests,
+ H5VL_iod_compact_requests (wlist, &nentries,dlist[i].num_requests,
dlist[i].requests);
- if (CP_SUCCESS != H5VL_iod_server_compactor_write (wlist, nentries)){
+ if (CP_SUCCESS != H5VL_iod_server_compactor_write (wlist, nentries)){
#if DEBUG_COMPACTOR
- fprintf (stderr,"COMPACTOR CB: compactor write failed \n");
+ fprintf (stderr,"COMPACTOR CB: compactor write failed \n");
#endif
- HGOTO_ERROR(H5E_SYM, H5E_WRITEERROR, CP_FAIL, "Lower lever write failed\n");
- }
- }
-
- /*Call here "iod_array_write_list"
+ HGOTO_ERROR(H5E_SYM, H5E_WRITEERROR, CP_FAIL, "Lower lever write failed\n");
+ }
+ }
+
+ for ( i = 0; i < nrdatasets; i++){
+
+ if (CP_SUCCESS != H5VL_iod_server_compactor_read (rlist, nrentries)){
+#if DEBUG_COMPACTOR
+ fprintf (stderr,"COMPACTOR CB: compactor read failed \n");
+#endif
+ HGOTO_ERROR(H5E_SYM, H5E_WRITEERROR, CP_FAIL, "Lower lever read failed\n");
+
+ }
+ }
+ /*Call here "iod_array_write_list"
array_write array has been constructed.
The COH value can come from any of the list requests
@@ -724,7 +737,7 @@ done:
This sends results to all the clients about their completion.
*/
-
+
if (CP_SUCCESS != H5VL_iod_free_memory_buffer (wlist, nentries)){
HGOTO_ERROR(H5E_SYM, H5E_WRITEERROR, CP_FAIL, "Failed while freeing memory buffer\n");
}
@@ -755,7 +768,256 @@ done:
FUNC_LEAVE_NOAPI_VOID
} /*end H5VL_iod_server_dset_compactor_cb */
+/*------------------------------------------------------------------------
+ * Function: H5VL_iod_server_compactor_read
+ *
+ * Purpose: Compactor lower lever read operations
+ *
+ * Return: Success: SUCCEED
+ * Failure: Negative
+ *
+ * Programmer: Vishwanath Venkatesan
+ * August, 2013
+ *
+ *-------------------------------------------------------------------------
+ */
+
+int H5VL_iod_server_compactor_read (void *_list, int num_requests)
+{
+ int ret_value = CP_SUCCESS;
+ int i, request_counter= 0;
+ int ndims, *ptr = NULL;
+ request_list_t *list = (request_list_t *)_list;
+ op_data_t *op_data;
+ dset_io_in_t *input;
+ iod_handle_t coh, iod_oh;
+ iod_obj_id_t iod_id;
+ hid_t space_id, dst_id, src_id;
+ size_t size, dst_size, src_size;
+ size_t nelmts;
+ void *buf = NULL;
+ uint8_t *buf_ptr;
+ hg_bulk_t bulk_handle;
+ hg_bulk_block_t bulk_block_handle;
+ hg_bulk_request_t bulk_request;
+ hssize_t num_descriptors = 0, n =0;
+ hbool_t opened_locally = FALSE;
+ iod_hyperslab_t *hslabs = NULL;
+ iod_mem_desc_t *mem_desc = NULL;
+ iod_array_iodesc_t file_desc;
+ iod_array_io_t *io_array = NULL;
+ iod_checksum_t *cs_list = NULL;
+ iod_ret_t *ret_list = NULL;
+ dset_read_out_t output;
+ uint32_t cs = 0;
+ na_addr_t dest;
+ hbool_t flag;
+
+
+
+ FUNC_ENTER_NOAPI_NOINIT
+
+ if (num_requests <= 0){
+#if DEBUG_COMPACTOR
+ fprintf (stderr,"COMPACTOR_READ: Request < 0 We should not be here!\n");
+#endif
+ ret_value = CP_FAIL;
+ goto done;
+ }
+
+#if DEBUG_COMPACTOR
+ fprintf (stderr,"Entering COMPACTOR READ with requests %d\n", num_requests);
+#endif
+
+ for (request_counter = 0; request_counter < num_requests; request_counter++){
+
+ op_data = (op_data_t *)list[request_counter].op_data;
+ input = (dset_io_in_t *)op_data->input;
+ coh = input->coh;
+ iod_oh = input->iod_oh;
+ iod_id = input->iod_id;
+ bulk_handle = input->bulk_handle;
+ space_id = list[request_counter].selection_id;
+ dst_id = input->dset_type_id;
+ src_id = input->mem_type_id;
+ dst_size = H5Tget_size(dst_id);
+ src_size = H5Tget_size(src_id);
+ buf = list[request_counter].mem_buf;
+
+ /* open the dataset if we don't have the handle yet */
+ if(iod_oh.cookie == (int)IOD_OH_UNDEFINED) {
+ if (iod_obj_open_write(coh, iod_id, NULL /*hints*/, &iod_oh, NULL) < 0)
+ HGOTO_ERROR(H5E_SYM, H5E_CANTINIT, FAIL, "can't open current group");
+ opened_locally = TRUE;
+ }
+
+ /* retrieve size of bulk data asked for to be read */
+ size = HG_Bulk_handle_get_size(bulk_handle);
+
+ /* get the number of points selected */
+ nelmts = (size_t)H5Sget_select_npoints(space_id);
+
+ if (list[request_counter].merged != SS){
+
+ /* get the rank of the dataspace */
+ if((ndims = H5Sget_simple_extent_ndims(space_id)) < 0)
+ HGOTO_ERROR(H5E_INTERNAL, H5E_CANTGET, FAIL, "unable to get dataspace dimesnsion");
+
+ /* get the number of decriptors required, i.e. the numbers of iod
+ I/O operations needed */
+ if(H5VL_iod_get_file_desc(space_id, &num_descriptors, NULL) < 0)
+ HGOTO_ERROR(H5E_DATASPACE, H5E_CANTGET, FAIL, "unable to generate IOD file descriptor from dataspace selection");
+
+ /* allocate the IOD hyperslab descriptors needed */
+ if(NULL == (hslabs = (iod_hyperslab_t *)malloc
+ (sizeof(iod_hyperslab_t) * (size_t)num_descriptors)))
+ HGOTO_ERROR(H5E_SYM, H5E_NOSPACE, FAIL, "can't allocate iod array descriptors");
+
+ for(n=0 ; n<num_descriptors ; n++) {
+ hslabs[n].start = (iod_size_t *)malloc(sizeof(iod_size_t) * ndims);
+ hslabs[n].stride = (iod_size_t *)malloc(sizeof(iod_size_t) * ndims);
+ hslabs[n].block = (iod_size_t *)malloc(sizeof(iod_size_t) * ndims);
+ hslabs[n].count = (iod_size_t *)malloc(sizeof(iod_size_t) * ndims);
+ }
+
+ /* generate the descriptors after allocating the array */
+ if(H5VL_iod_get_file_desc(space_id, &num_descriptors, hslabs) < 0)
+ HGOTO_ERROR(H5E_DATASPACE, H5E_CANTGET, FAIL, "unable to generate IOD file descriptor from dataspace selection");
+
+ buf_ptr = (uint8_t *)buf;
+
+ /* allocate the IOD array parameters for reading */
+ if(NULL == (io_array = (iod_array_io_t *)malloc
+ (sizeof(iod_array_io_t) * (size_t)num_descriptors)))
+ HGOTO_ERROR(H5E_SYM, H5E_NOSPACE, FAIL, "can't allocate iod array");
+
+ /* allocate cs array */
+ if(NULL == (cs_list = (iod_checksum_t *)calloc
+ (sizeof(iod_checksum_t), (size_t)num_descriptors)))
+ HGOTO_ERROR(H5E_SYM, H5E_NOSPACE, FAIL, "can't allocate checksum array");
+
+ /* allocate return array */
+ if(NULL == (ret_list = (iod_ret_t *)calloc
+ (sizeof(iod_ret_t), (size_t)num_descriptors)))
+ HGOTO_ERROR(H5E_SYM, H5E_NOSPACE, FAIL, "can't allocate iod array");
+
+ /* Set up I/O list */
+ for(n=0 ; n<num_descriptors ; n++) {
+
+ hsize_t num_bytes = 0;
+ hsize_t num_elems = 1;
+
+ /* determine how many bytes the current descriptor holds */
+ for(i=0 ; i<ndims ; i++)
+ num_elems *= (hslabs[n].count[i] * hslabs[n].block[i]);
+ num_bytes = num_elems * src_size;
+
+ mem_desc = (iod_mem_desc_t *)malloc(sizeof(iod_mem_desc_t) + sizeof(iod_mem_frag_t));
+ mem_desc->nfrag = 1;
+ mem_desc->frag[0].addr = (void *)buf_ptr;
+ mem_desc->frag[0].len = (iod_size_t)num_bytes;
+ buf_ptr += num_bytes;
+
+ /* set the file descriptor */
+ file_desc = hslabs[n];
+
+#if H5VL_IOD_DEBUG
+ for(i=0 ; i<ndims ; i++) {
+ fprintf(stderr, "Dim %d: start %zu stride %zu block %zu count %zu\n",
+ i, (size_t)file_desc.start[i], (size_t)file_desc.stride[i],
+ (size_t)file_desc.block[i], (size_t)file_desc.count[i]);
+ }
+#endif
+ /* setup list I/O parameters */
+ io_array[n].oh = iod_oh;
+ io_array[n].hints = NULL;
+ io_array[n].mem_desc = mem_desc;
+ io_array[n].io_desc = &file_desc;
+ io_array[n].cs = &cs_list[n];
+ io_array[n].ret = &ret_list[n];
+ }
+
+ /* Read list IO */
+ if(iod_array_read_list(coh, IOD_TID_UNKNOWN, (iod_size_t)num_descriptors,
+ io_array, NULL) < 0)
+ HGOTO_ERROR(H5E_SYM, H5E_READERROR, FAIL, "can't read from array object");
+
+ /* verify return values */
+ for(n=0 ; n<num_descriptors ; n++) {
+ if(ret_list[n] < 0)
+ HGOTO_ERROR(H5E_SYM, H5E_READERROR, FAIL, "can't read from array object");
+ }
+
+ flag = FALSE;
+ ptr = (int *)buf;
+#if H5_DO_NATIVE
+ ret_value = H5Dread(iod_oh.cookie, src_id, H5S_ALL, space_id, H5P_DEFAULT, buf);
+#else /* fake data */
+ for(i=0;i<64;++i)
+ ptr[i] = i;
+#endif
+
+ }
+
+ /* Create a new block handle to write the data */
+ HG_Bulk_block_handle_create(buf, size, HG_BULK_READ_ONLY, &bulk_block_handle);
+
+ dest = HG_Handler_get_addr (op_data->hg_handle);
+
+ /* Write bulk data here and wait for the data to be there */
+ if(HG_SUCCESS != HG_Bulk_write_all(dest, bulk_handle, bulk_block_handle, &bulk_request))
+ HGOTO_ERROR(H5E_SYM, H5E_READERROR, FAIL, "can't read from array object");
+
+ /* wait for it to complete */
+ if(HG_SUCCESS != HG_Bulk_wait(bulk_request, HG_MAX_IDLE_TIME, HG_STATUS_IGNORE))
+ HGOTO_ERROR(H5E_SYM, H5E_READERROR, FAIL, "can't read from array object");
+
+ done:
+ output.ret = ret_value;
+ output.cs = cs;
+
+#if H5VL_IOD_DEBUG
+ fprintf(stderr, "Done with dset read, checksum %u, sending response to client\n", cs);
+#endif
+ if(HG_SUCCESS != HG_Handler_start_output(op_data->hg_handle, &output))
+ HDONE_ERROR(H5E_SYM, H5E_WRITEERROR, FAIL, "can't send result of write to client");
+ if(HG_SUCCESS != HG_Bulk_block_handle_free(bulk_block_handle))
+ HDONE_ERROR(H5E_SYM, H5E_WRITEERROR, FAIL, "can't free bds block handle");
+
+ input = (dset_io_in_t *)H5MM_xfree(input);
+ op_data = (op_data_t *)H5MM_xfree(op_data);
+
+ if(buf)
+ free(buf);
+
+ /* free allocated descriptors */
+ for(n=0 ; n<num_descriptors ; n++) {
+ free(hslabs[n].start);
+ free(hslabs[n].stride);
+ free(hslabs[n].block);
+ free(hslabs[n].count);
+ }
+ if(hslabs)
+ free(hslabs);
+ if(io_array)
+ free(io_array);
+ if(cs_list)
+ free(cs_list);
+ if(ret_list)
+ free(ret_list);
+
+ /* close the dataset if we opened it in this routine */
+ if(opened_locally) {
+ if(iod_obj_close(iod_oh, NULL, NULL))
+ HDONE_ERROR(H5E_SYM, H5E_CANTINIT, FAIL, "can't close Array object");
+ }
+
+ }
+
+ FUNC_LEAVE_NOAPI(ret_value);
+
+}/* end H5VL_iod_copactor_read */
/*------------------------------------------------------------------------
* Function: H5VL_iod_server_compactor_write
*
@@ -817,11 +1079,11 @@ int H5VL_iod_server_compactor_write (void *_list, int num_requests)
ret_value = CP_FAIL;
goto done;
}
-
+
#if DEBUG_COMPACTOR
fprintf (stderr,"Entering COMPACTOR WRITE with requests %d\n", num_requests);
#endif
-
+
for (request_counter = 0; request_counter < num_requests; request_counter++){
if (list[request_counter].merged != USED_IN_MERGING){
diff --git a/src/H5VLiod_server.c b/src/H5VLiod_server.c
index 066bd1d..3283055 100644
--- a/src/H5VLiod_server.c
+++ b/src/H5VLiod_server.c
@@ -1270,6 +1270,7 @@ H5VL_iod_server_dset_read(hg_handle_t handle)
op_data->hg_handle = handle;
op_data->input = (void *)input;
+#if 0
if(input->parent_axe_id) {
if (AXE_SUCCEED != AXEcreate_task(engine, input->axe_id,
1, &input->parent_axe_id, 0, NULL,
@@ -1281,6 +1282,14 @@ H5VL_iod_server_dset_read(hg_handle_t handle)
H5VL_iod_server_dset_read_cb, op_data, NULL))
HGOTO_ERROR(H5E_SYM, H5E_CANTINIT, HG_FAIL, "can't insert task into async engine");
}
+#endif
+
+#if 1
+ if(CP_SUCCESS != H5VL_iod_server_dset_compactor(op_data, READ)){
+ HGOTO_ERROR(H5E_SYM, H5E_CANTINIT, HG_FAIL, "compactor task failed for WRITE\n");
+ }
+#endif
+
done:
FUNC_LEAVE_NOAPI(ret_value)
diff --git a/src/H5VLiod_server.h b/src/H5VLiod_server.h
index 3653edc..6041116c 100644
--- a/src/H5VLiod_server.h
+++ b/src/H5VLiod_server.h
@@ -29,7 +29,7 @@
#ifdef H5_HAVE_EFF
#define EEXISTS 1
-#define H5_DO_NATIVE 1
+#define H5_DO_NATIVE 0
#define DEBUG_COMPACTOR 1
/* Key names for Metadata stored in KV objects */