summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVishwanath Venkatesan <vish@hdfgroup.org>2013-07-12 23:49:55 (GMT)
committerVishwanath Venkatesan <vish@hdfgroup.org>2013-07-12 23:49:55 (GMT)
commitfb13bdbd0b91fd07665b60ded99d3d187f6b0a2d (patch)
tree92bf8cc69ae25fa53e5cd45f1f3c004c496313f5
parenta8a349b0dcd299f37d3d94d93d688979b137c319 (diff)
downloadhdf5-fb13bdbd0b91fd07665b60ded99d3d187f6b0a2d.zip
hdf5-fb13bdbd0b91fd07665b60ded99d3d187f6b0a2d.tar.gz
hdf5-fb13bdbd0b91fd07665b60ded99d3d187f6b0a2d.tar.bz2
[svn-r23894] End of day commit.
-rw-r--r--src/H5VLiod_compactor.c25
-rw-r--r--src/H5VLiod_dset.c264
2 files changed, 192 insertions, 97 deletions
diff --git a/src/H5VLiod_compactor.c b/src/H5VLiod_compactor.c
index 86c017d..91245ad 100644
--- a/src/H5VLiod_compactor.c
+++ b/src/H5VLiod_compactor.c
@@ -745,6 +745,7 @@ int H5VL_iod_compact_requests (request_list_t *list, int *total_requests,
if (!merge_flag){
merge_flag = 1;
}
+ list[request_list[i]].merged = USED_IN_MERGING;
last_merged = res_dataspace; /*update the last merged!*/
}
else{
@@ -837,6 +838,14 @@ int H5VL_iod_compact_requests (request_list_t *list, int *total_requests,
free(lselected_req);
lselected_req = NULL;
}
+ for (i = 0; i < original_requests; i ++){
+ fprintf(stderr,"%s:%d, Request :%d --> %d\n",
+ __FILE__,
+ __LINE__,
+ i,
+ list[i].merged);
+ }
+
*total_requests = original_requests;
done:
@@ -874,7 +883,7 @@ int H5VL_iod_construct_merged_request (request_list_t *list,
size_t g_entries = 0, j, m_entries = 0;
size_t *glens = NULL, blck_cnt = 0, *mlen = NULL;
block_container_t *sf_block = NULL, *sm_block = NULL;
- int ret_value = CP_SUCCESS;
+ int ret_value = CP_SUCCESS, set_op_data = 0;
FUNC_ENTER_NOAPI(NULL)
@@ -890,6 +899,8 @@ int H5VL_iod_construct_merged_request (request_list_t *list,
}
#endif
+
+
fblks = H5VL_iod_get_selected_fblocks_count(lselected_req,
num_selected,
list);
@@ -917,7 +928,10 @@ int H5VL_iod_construct_merged_request (request_list_t *list,
for ( i = 0; i < num_requests; i++){
if (CP_SUCCESS ==
H5VL_iod_request_exist(request_list[i], lselected_req, num_selected)){
-
+ if (!set_op_data){
+ merged_request->op_data = list[request_list[i]].op_data;
+ set_op_data;
+ }
for (j = 0; j < list[request_list[i]].num_fblocks; j++){
sf_block[blck_cnt].offset = list[request_list[i]].fblocks[j].offset;
sf_block[blck_cnt].len = list[request_list[i]].fblocks[j].len;
@@ -1021,6 +1035,8 @@ int H5VL_iod_construct_merged_request (request_list_t *list,
merged_request->fblocks[j].offset = goffsets[j];
merged_request->fblocks[j].len = glens[j];
}
+ merged_request->num_fblocks = m_entries;
+
/*
Then compare that with automatically generated offsets
(the offsets have to match) */
@@ -1044,7 +1060,10 @@ int H5VL_iod_construct_merged_request (request_list_t *list,
merged_request->mblocks[j].len);
#endif
}
- } /*if num_selected > 1*/
+ merged_request->num_mblocks = mblks;
+ }
+
+ /*if num_selected > 1*/
else{
#if DEBUG_COMPACTOR
fprintf(stderr,"in %s:%d Should not be here with num_selected :%d \n",
diff --git a/src/H5VLiod_dset.c b/src/H5VLiod_dset.c
index dc56cd0..215ebbd 100644
--- a/src/H5VLiod_dset.c
+++ b/src/H5VLiod_dset.c
@@ -1,4 +1,4 @@
- /* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *
+/* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *
* Copyright by The HDF Group. *
* Copyright by the Board of Trustees of the University of Illinois. *
* All rights reserved. *
@@ -626,6 +626,7 @@
FUNC_LEAVE_NOAPI_VOID
} /* end H5VL_iod_server_dset_read_cb() */
+
/*-------------------------------------------------------------------------
* Function: H5VL_iod_server_dset_compactor_cb
*
@@ -668,9 +669,7 @@
compactor_queue_flag = 0;
curr_queue = NULL;
pthread_mutex_unlock(&lock);
-
-
-
+
ret_value = H5VL_iod_create_request_list (cqueue,
&wlist,
&nentries,
@@ -685,7 +684,7 @@
}
#endif
-#if 0
+#if 1
for ( i = 0; i < ndatasets; i ++){
H5VL_iod_compact_requests (wlist, &nentries,dlist[i].num_requests,
dlist[i].requests);
@@ -730,9 +729,7 @@
*-------------------------------------------------------------------------
*/
-
-int
-H5VL_iod_server_compactor_write (void *_list, int num_requests)
+int H5VL_iod_server_compactor_write (void *_list, int num_requests)
{
@@ -748,15 +745,16 @@ H5VL_iod_server_compactor_write (void *_list, int num_requests)
hg_bulk_t bulk_handle;
hid_t space_id, dst_id;
size_t size, dst_size;
- void *buf;
+ void *buf = NULL;
uint8_t *buf_ptr;
hssize_t num_descriptors = 0, n =0;
hbool_t opened_locally = FALSE;
iod_hyperslab_t *hslabs = NULL;
- iod_mem_desc_t mem_desc;
+ iod_mem_desc_t *mem_desc = NULL;
iod_array_iodesc_t file_desc;
- hsize_t num_bytes = 0;
- hsize_t num_elems = 0;
+ hsize_t num_bytes = 0, mem_reqs = 0, start_reqs = 0;
+ hsize_t num_elems = 0, k = 0, j=0;
+ hsize_t curr_j = 0, bytes_left = 0;
FUNC_ENTER_NOAPI_NOINIT
@@ -773,62 +771,82 @@ H5VL_iod_server_compactor_write (void *_list, int num_requests)
#endif
for (request_counter = 0; request_counter < num_requests; request_counter++){
- op_data = (op_data_t *)list[request_counter].op_data;
- input = (dset_io_in_t *)op_data->input;
- iod_oh = input->iod_oh;
- iod_id = input->iod_id;
- bulk_handle = input->bulk_handle;
- space_id = list[request_counter].selection_id;
- dst_id = input->dset_type_id;
- dst_size = H5Tget_size(dst_id);
-
- if((ndims = H5Sget_simple_extent_ndims(space_id)) < 0)
- HGOTO_ERROR(H5E_INTERNAL, H5E_CANTGET, FAIL, "unable to get dataspace dimesnsion");
-
- fprintf (stderr,"space_id: %d, selection_id: %d, ndims : %d\n",
- space_id, list[request_counter].selection_id, ndims);
-
- if(iod_oh.cookie == IOD_OH_UNDEFINED) {
- if (iod_obj_open_write(coh, iod_id, NULL /*hints*/, &iod_oh, NULL) < 0)
- HGOTO_ERROR(H5E_SYM, H5E_CANTINIT, FAIL, "can't open current group");
- opened_locally = TRUE;
- }
-
- if (list[request_counter].merged == NOT_MERGED){
- /*Even in the case its not merged the buffer was already extracted*/
- fprintf (stderr,"COMPACTOR WRITE: Request %d has not been merged \n", request_counter);
- size = list[request_counter].mem_length;
- buf = (void *)list[request_counter].mem_buf;
- }
-
+ if (list[request_counter].merged != USED_IN_MERGING){
+
+ /*When the request was used in merging we have to just
+ manage the handle*/
+ if (list[request_counter].merged == NOT_MERGED){
+ op_data = (op_data_t *)list[request_counter].op_data;
+ input = (dset_io_in_t *)op_data->input;
+ iod_oh = input->iod_oh;
+ iod_id = input->iod_id;
+ bulk_handle = input->bulk_handle;
+ space_id = list[request_counter].selection_id;
+ dst_id = input->dset_type_id;
+ dst_size = H5Tget_size(dst_id);
+
+ fprintf (stderr,"space_id: %d, selection_id: %d\n",
+ space_id, list[request_counter].selection_id);
+
+ /*Even in the case its not merged the buffer was already extracted*/
+ fprintf (stderr,"COMPACTOR WRITE: Request %d has not been merged \n", request_counter+1);
+ size = list[request_counter].mem_length;
+ buf = (void *)list[request_counter].mem_buf;
#if 1
- ptr = (int *) buf;
-
- fprintf(stderr,"COMPACTOR WRITE: Received a buffer or size %zd with values :",
- size);
- for(i=0;i<size/4;++i){
- fprintf(stderr, "COMPACTOR WRITE: buf[%d] : %d\n",
- i, ptr[i]);
- }
- fprintf(stderr, "\n");
+ ptr = (int *) buf;
+ fprintf(stderr,"COMPACTOR WRITE: Received a buffer of size %zd with values :",
+ size);
+ for(i=0;i<size/4;++i){
+ fprintf(stderr, "COMPACTOR WRITE: buf[%d] : %d\n",
+ i, ptr[i]);
+ }
+ fprintf(stderr, "\n");
#endif
+ }
- /* get the rank of the dataspace */
-
-
- /* get the number of decriptors required, i.e. the numbers of iod
+ if (list[request_counter].merged == MERGED){
+ op_data = (op_data_t *)list[request_counter].op_data;
+ input = (dset_io_in_t *)op_data->input;
+ iod_oh = input->iod_oh;
+ iod_id = input->iod_id;
+ space_id = list[request_counter].selection_id;
+ dst_id = input->dset_type_id;
+ dst_size = H5Tget_size(dst_id);
+
+ fprintf (stderr,"space_id: %d, selection_id: %d\n",
+ space_id, list[request_counter].selection_id);
+
+ /*Even in the case its not merged the buffer was already extracted*/
+ fprintf (stderr,"COMPACTOR WRITE: Request %d has been merged \n", request_counter+1);
+ }
+
+
+ if(iod_oh.cookie == IOD_OH_UNDEFINED) {
+ if (iod_obj_open_write(coh, iod_id, NULL /*hints*/, &iod_oh, NULL) < 0)
+ HGOTO_ERROR(H5E_SYM, H5E_CANTINIT, FAIL, "can't open current group");
+ opened_locally = TRUE;
+ }
+
+
+
+ /* get the rank of the dataspace */
+ if((ndims = H5Sget_simple_extent_ndims(space_id)) < 0)
+ HGOTO_ERROR(H5E_INTERNAL, H5E_CANTGET, FAIL, "unable to get dataspace dimesnsion");
+
+ /* get the number of decriptors required, i.e. the numbers of iod
I/O operations needed */
- if(H5VL_iod_get_file_desc(space_id, &num_descriptors, NULL) < 0)
- HGOTO_ERROR(H5E_DATASPACE, H5E_CANTGET, FAIL, "unable to generate IOD file descriptor from dataspace selection");
+ if(H5VL_iod_get_file_desc(space_id, &num_descriptors, NULL) < 0)
+ HGOTO_ERROR(H5E_DATASPACE, H5E_CANTGET, FAIL, "unable to generate IOD file descriptor from dataspace selection");
+
+ fprintf (stderr,"num_descriptors : %llu, ndims: %d\n",
+ num_descriptors,
+ ndims);
+
+ /* allocate the IOD hyperslab descriptors needed */
+ if(NULL == (hslabs = (iod_hyperslab_t *)malloc
+ (sizeof(iod_hyperslab_t) * (size_t)num_descriptors)))
+ HGOTO_ERROR(H5E_SYM, H5E_NOSPACE, FAIL, "can't allocate iod array descriptors");
- fprintf (stderr,"num_descriptors : %llu, ndims: %d\n",
- num_descriptors,
- ndims);
- /* allocate the IOD hyperslab descriptors needed */
- if(NULL == (hslabs = (iod_hyperslab_t *)malloc
- (sizeof(iod_hyperslab_t) * (size_t)num_descriptors)))
- HGOTO_ERROR(H5E_SYM, H5E_NOSPACE, FAIL, "can't allocate iod array descriptors");
-
for (n = 0; n < num_descriptors; n++){
hslabs[n].start = (iod_size_t *)malloc(sizeof(iod_size_t) * ndims);
hslabs[n].stride = (iod_size_t *)malloc(sizeof(iod_size_t) * ndims);
@@ -839,14 +857,15 @@ H5VL_iod_server_compactor_write (void *_list, int num_requests)
/* generate the descriptors after allocating the array */
if(H5VL_iod_get_file_desc(space_id, &num_descriptors, hslabs) < 0)
HGOTO_ERROR(H5E_DATASPACE, H5E_CANTGET, FAIL, "unable to generate IOD file descriptor from dataspace selection");
-
- buf_ptr = (uint8_t *)buf;
+ curr_j = 0;
for(n=0 ; n<num_descriptors ; n++) {
+
+ fprintf (stderr,
+ "Enters for num_descriptors : %llu, n : %llu\n",
+ num_descriptors, n);
- fprintf (stderr, "Enters for num_descriptors : %llu, n : %llu\n", num_descriptors, n);
num_bytes = 0;
num_elems = 0;
-
/* determine how many bytes the current descriptor holds */
for(i=0 ; i<ndims ; i++){
@@ -854,34 +873,89 @@ H5VL_iod_server_compactor_write (void *_list, int num_requests)
num_elems *= (hslabs[n].count[i] * hslabs[n].block[i]);
}
num_bytes = num_elems * dst_size;
-#if 0
- /* set the memory descriptor */
- mem_desc = (iod_mem_desc_t *) malloc (sizeof (iod_mem_desc_t));
- mem_desc->nfrag = 1;
- mem_desc->frag[0].addr = (void *)buf_ptr;
- mem_desc->frag[0].len = (iod_size_t)num_bytes;
+
+ if (list[request_counter].merged == NOT_MERGED){
+ buf_ptr = (uint8_t *)buf;
+#if 1
+ /* set the memory descriptor */
+ mem_desc = (iod_mem_desc_t *) malloc (sizeof (iod_mem_desc_t));
+ mem_desc->frag = (iod_mem_frag_t *) malloc (sizeof (iod_mem_desc_t));
+ mem_desc->nfrag = 1;
+ mem_desc->frag[0].addr = (void *)buf_ptr;
+ mem_desc->frag[0].len = (iod_size_t)num_bytes;
#endif
- buf_ptr += num_bytes;
- file_desc = hslabs[n];
+ buf_ptr += num_bytes;
#if H5VL_IOD_DEBUG
- for(i=0 ; i<ndims ; i++) {
- fprintf(stderr, "Dim %d: start %zu stride %zu block %zu count %zu\n",
- i, (size_t)file_desc.start[i], (size_t)file_desc.stride[i],
- (size_t)file_desc.block[i], (size_t)file_desc.count[i]);
+ for(i=0 ; i<ndims ; i++) {
+ fprintf(stderr, "Dim %d: start %zu stride %zu block %zu count %zu\n",
+ i, (size_t)file_desc.start[i], (size_t)file_desc.stride[i],
+ (size_t)file_desc.block[i], (size_t)file_desc.count[i]);
+ }
+#endif
+ if (NULL != buf){
+ free(buf);
+ }
}
-#endif
- /* write from array object */
- /* TODO: VV Change checksum once that is fixed*/
- if(iod_array_write(iod_oh, IOD_TID_UNKNOWN, NULL, NULL /*This is where the memory descriptor goes*/,
- &file_desc, NULL, NULL) < 0)
- HGOTO_ERROR(H5E_DATASET, H5E_WRITEERROR, FAIL, "can't write to array object");
- }
+ if (list[request_counter].merged == MERGED){
+ k = 0;
+ mem_reqs = 0; start_reqs = curr_j;
+ for (j = curr_j; j < list[i].num_mblocks; j++){
+ if (k < num_bytes){
+ if (bytes_left){
+ k += bytes_left;
+ mem_reqs++;
+ continue;
+ }
+ else if (k + list[i].mblocks[j].len > num_bytes){
+ bytes_left = list[i].mblocks[j].len - (num_bytes - k );
+ curr_j = j;
+ mem_reqs++;
+ break;
+ }
+ else{ /* <= case*/
+ k += list[i].mblocks[j].len;
+ curr_j = j;
+ mem_reqs++;
+ }
+ }
+ }
+ /*Determined the number of entries in the memory block for this hslab
+ descriptor*/
+ mem_desc = (iod_mem_desc_t *) malloc (sizeof (iod_mem_desc_t));
+ mem_desc->nfrag = mem_reqs;
+ mem_desc->frag = (iod_mem_frag_t *) malloc (mem_reqs *
+ sizeof (iod_mem_desc_t));
+ k = 0;
+ for ( j = start_reqs; j < mem_reqs; j++){
+ mem_desc->frag[k].addr = list[i].mblocks[j].offset;
+ mem_desc->frag[k].len = list[i].mblocks[j].len;
+ }
+ }
+ }
+ /* write from array object */
+ /* TODO: VV Change checksum once that is fixed*/
+ file_desc = hslabs[n];
+ if(iod_array_write(iod_oh, IOD_TID_UNKNOWN, NULL,
+ mem_desc /*This is where the memory descriptor goes*/,
+ &file_desc,
+ NULL, NULL) < 0)
+ HGOTO_ERROR(H5E_DATASET, H5E_WRITEERROR, FAIL, "can't write to array object");
+
+ if (NULL != mem_desc->frag){
+ free(mem_desc->frag);
+ }
+ if (NULL != mem_desc){
+ free(mem_desc);
+ mem_desc = NULL;
+ }
+
+ }
#if H5VL_IOD_DEBUG
fprintf(stderr, "Done with dset write, sending %d response to client\n", ret_value);
#endif
- free(buf);
+
/* free allocated descriptors */
for(n=0 ; n<num_descriptors ; n++) {
@@ -899,17 +973,19 @@ H5VL_iod_server_compactor_write (void *_list, int num_requests)
HGOTO_ERROR(H5E_SYM, H5E_CANTINIT, FAIL, "can't close Array object");
}
}
+
for (i = 0; i< num_requests; i++){
- op_data = (op_data_t *)list[i].op_data;
- input = (dset_io_in_t *)op_data->input;
- if(HG_SUCCESS != HG_Handler_start_output(op_data->hg_handle, &ret_value))
- HDONE_ERROR(H5E_SYM, H5E_WRITEERROR, FAIL, "can't send result of write to client");
- input = (dset_io_in_t *)H5MM_xfree(input);
- op_data = (op_data_t *)H5MM_xfree(op_data);
+ if (list[i].merged != MERGED){
+ op_data = (op_data_t *)list[i].op_data;
+ input = (dset_io_in_t *)op_data->input;
+ if(HG_SUCCESS != HG_Handler_start_output(op_data->hg_handle, &ret_value))
+ HDONE_ERROR(H5E_SYM, H5E_WRITEERROR, FAIL, "can't send result of write to client");
+ input = (dset_io_in_t *)H5MM_xfree(input);
+ op_data = (op_data_t *)H5MM_xfree(op_data);
+ }
}
done:
-
FUNC_LEAVE_NOAPI(ret_value);
}/*H5VL_iod_server_compactor_write*/