summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVishwanath Venkatesan <vish@hdfgroup.org>2013-07-31 22:25:31 (GMT)
committerVishwanath Venkatesan <vish@hdfgroup.org>2013-07-31 22:25:31 (GMT)
commitaa8503c35f047b3db20fd907a34789ee54568a7f (patch)
tree4bf145381fdf6c84adf2fedf8f5af02c2e215e1f
parentf4cb87ec1a4d57bdaae89ac03f39aada49ba3a4f (diff)
downloadhdf5-aa8503c35f047b3db20fd907a34789ee54568a7f.zip
hdf5-aa8503c35f047b3db20fd907a34789ee54568a7f.tar.gz
hdf5-aa8503c35f047b3db20fd907a34789ee54568a7f.tar.bz2
[svn-r23957] Code cleanup + Fixing issues for handling writing
from compactor for multiple clients.
-rw-r--r--src/H5VLiod_compactor.c36
-rw-r--r--src/H5VLiod_compactor.h15
-rw-r--r--src/H5VLiod_dset.c165
3 files changed, 120 insertions, 96 deletions
diff --git a/src/H5VLiod_compactor.c b/src/H5VLiod_compactor.c
index 77eff0f..f6247ac 100644
--- a/src/H5VLiod_compactor.c
+++ b/src/H5VLiod_compactor.c
@@ -185,6 +185,10 @@ int H5VL_iod_create_request_list (compactor *queue, request_list_t **list,
int j = 0, request_id = 0, i, current_dset_id = 0, lreq = 0, flag = 0;
size_t ii;
uint64_t axe_id;
+
+#if H5_DO_NATIVE
+ char dname[256], dname1[256];
+#endif
@@ -297,9 +301,18 @@ int H5VL_iod_create_request_list (compactor *queue, request_list_t **list,
num_datasets++;
}
- /*from the second id!*/else{
+ /*from the second id!*/
+ else{
+#if H5_DO_NATIVE
+ H5Iget_name (current_dset, dname, 257);
+ H5Iget_name (dataset_id, dname1, 257);
+ if ( (current_dset == (hid_t)dataset_id) ||
+ (!(strcmp(dname, dname1))) ) {
+#else
if (current_dset == (hid_t)dataset_id){
+#endif
+
#if DEBUG_COMPACTOR
fprintf(stderr, "in %s:%d current dataset: %d has the request %d at %d \n",
__FILE__, __LINE__,
@@ -372,6 +385,8 @@ int H5VL_iod_create_request_list (compactor *queue, request_list_t **list,
}
+
+
/*Allocation buffer and retrieving the buffer associated with the selection
through the function shipper*/
if(NULL == (buf = malloc(buf_size)))
@@ -2037,4 +2052,23 @@ int H5VL_iod_sort_block_container (block_container_t *io_array,
}
+int H5VL_iod_free_memory_buffer (request_list_t *list, int num_requests){
+
+ int ret_value = CP_SUCCESS;
+ int i;
+
+ for (i = 0; i< num_requests; i++){
+
+ if (list[i].merged == USED_IN_MERGING){
+ if (list[i].mem_buf){
+ free(list[i].mem_buf);
+ list[i].mem_buf = NULL;
+ list[i].mem_length = 0;
+ }
+ }
+ }
+
+ return ret_value;
+}
+
#endif /* H5_HAVE_EFF*/
diff --git a/src/H5VLiod_compactor.h b/src/H5VLiod_compactor.h
index 042abac..355d537 100644
--- a/src/H5VLiod_compactor.h
+++ b/src/H5VLiod_compactor.h
@@ -112,13 +112,14 @@ H5_DLL int H5VL_iod_construct_merged_request (request_list_t *list,
int *request_list, int num_requests);
H5_DLL int H5VL_iod_reconstruct_overlapped_request (block_container_t *sf_block,
- block_container_t *sm_block,
- block_container_t **out_sf_block,
- block_container_t **out_sm_block,
- size_t *revfblks, size_t *revmblks,
- size_t i, size_t j,
- int *changed, int *changed_cnt);
-
+ block_container_t *sm_block,
+ block_container_t **out_sf_block,
+ block_container_t **out_sm_block,
+ size_t *revfblks, size_t *revmblks,
+ size_t i, size_t j,
+ int *changed, int *changed_cnt);
+
+H5_DLL int H5VL_iod_free_memory_buffer (request_list_t *list, int num_requests);
/*---------------------------------------------------------------------------------------- */
diff --git a/src/H5VLiod_dset.c b/src/H5VLiod_dset.c
index 6710d9d..35f022f 100644
--- a/src/H5VLiod_dset.c
+++ b/src/H5VLiod_dset.c
@@ -659,7 +659,9 @@ done:
fprintf(stderr, " COMPACTOR CB: Sleeping\n");
fflush(stderr);
#endif
- usleep(1000);
+#if H5_DO_NATIVE
+ sleep(1);
+#endif
@@ -720,30 +722,30 @@ done:
Once that is done --> execute the func : H5VL_iod_server_send_result
This sends results to all the clients about their completion.
-
- There is a flaw in the IOD construction of io_array_t datastructure.
- They seem to have mem_desc and io_desc as pointers and there is no-way to specify
- the number of descriptors!.. We need that to free the memory descriptors.
- For now we can leave it at this!
- */
- if (NULL != wlist){
- free(wlist);
- wlist = NULL;
- }
- if (NULL != rlist){
- free(rlist);
- wlist = NULL;
- }
- if (NULL != dlist){
- free(dlist);
- dlist = NULL;
- }
+ */
+
+ if (CP_SUCCESS != H5VL_iod_free_memory_buffer (wlist, nentries)){
+ HGOTO_ERROR(H5E_SYM, H5E_WRITEERROR, CP_FAIL, "Failed while freeing memory buffer\n");
+ }
+
+ if (NULL != wlist){
+ free(wlist);
+ wlist = NULL;
+ }
+ if (NULL != rlist){
+ free(rlist);
+ wlist = NULL;
+ }
+ if (NULL != dlist){
+ free(dlist);
+ dlist = NULL;
+ }
if (NULL != drlist){
free(drlist);
drlist = NULL;
}
-
+
if (CP_SUCCESS != H5VL_iod_destroy_compactor_queue(cqueue)){
HGOTO_ERROR(H5E_HEAP, H5E_NOSPACE, CP_FAIL, "Cannot free NULL queue\n");
@@ -796,6 +798,7 @@ int H5VL_iod_server_compactor_write (void *_list, int num_requests)
iod_array_io_t *io_array = NULL; /* arary for list I/O */
iod_checksum_t *cs_list = NULL;
iod_ret_t *ret_list = NULL;
+
#if H5_DO_NATIVE
size_t native_length = 0, total_length = 0;
char *write_buf = NULL, *write_buf_ptr = NULL;
@@ -822,71 +825,65 @@ int H5VL_iod_server_compactor_write (void *_list, int num_requests)
for (request_counter = 0; request_counter < num_requests; request_counter++){
if (list[request_counter].merged != USED_IN_MERGING){
+#if H5_DO_NATIVE
+ native_dims = (hsize_t *) malloc (sizeof(hsize_t));
+#endif
+
+ op_data = (op_data_t *)list[request_counter].op_data;
+ input = (dset_io_in_t *)op_data->input;
+ coh = input->coh;
+ iod_oh = input->iod_oh;
+ iod_id = input->iod_id;
+ bulk_handle = input->bulk_handle;
+ space_id = list[request_counter].selection_id;
+ dst_id = input->dset_type_id;
+ src_id = input->mem_type_id;
+ dst_size = H5Tget_size(dst_id);
+ src_size = H5Tget_size(src_id);
/*When the request was used in merging we have to just
manage the handle*/
if (list[request_counter].merged == NOT_MERGED){
- op_data = (op_data_t *)list[request_counter].op_data;
- input = (dset_io_in_t *)op_data->input;
- coh = input->coh;
- iod_oh = input->iod_oh;
- iod_id = input->iod_id;
- bulk_handle = input->bulk_handle;
- space_id = list[request_counter].selection_id;
- dst_id = input->dset_type_id;
-
fprintf (stderr,"COMPACTOR space_id: %d, selection_id: %d\n",
space_id, list[request_counter].selection_id);
/*Even in the case its not merged the buffer was already extracted*/
- fprintf (stderr,"COMPACTOR WRITE: Request %d has not been merged \n", request_counter+1);
size = list[request_counter].mem_length;
+ fprintf (stderr,"COMPACTOR WRITE: Request %d has not been merged\n",
+ request_counter+1, size, src_size);
buf = (void *)list[request_counter].mem_buf;
-
-#if DEBUG_COMPACTOR
- ptr = (int *) buf;
- fprintf(stderr,"COMPACTOR WRITE: Received a buffer of size %zd with values :",
- size);
- for(i=0;i< (int)size/4;++i){
- fprintf(stderr, "COMPACTOR WRITE: buf[%d] : %d\n",
- i, ptr[i]);
- }
- fprintf(stderr, "\n");
+
+#if H5_DO_NATIVE
+ *native_dims = (hsize_t)(size/src_size);
+ write_buf = (char *)buf;
#endif
+ buf_ptr = (uint8_t *)buf;
}
+
if (list[request_counter].merged == MERGED){
- op_data = (op_data_t *)list[request_counter].op_data;
- input = (dset_io_in_t *)op_data->input;
- coh = input->coh;
- iod_oh = input->iod_oh;
- iod_id = input->iod_id;
- space_id = list[request_counter].selection_id;
- dst_id = input->dset_type_id;
- src_id = input->dset_type_id;
-
#if H5_DO_NATIVE
for (hi = 0; hi < list[request_counter].num_mblocks; hi++){
total_length += list[request_counter].mblocks[hi].len;
}
- fprintf (stderr,"total_length: %zd\n", total_length);
+ *native_dims = (hsize_t)(total_length/src_size);
#endif
#if DEBUG_COMPACTOR
- fprintf (stderr,"space_id: %d, selection_id: %d\n",
- space_id, list[request_counter].selection_id);
/*Even in the case its not merged the buffer was already extracted*/
- fprintf (stderr,"COMPACTOR WRITE: Request %d has been merged \n", request_counter+1);
+ fprintf (stderr,"COMPACTOR WRITE: Request %d has been merged \n",
+ request_counter+1);
#endif
}
- dst_size = H5Tget_size(dst_id);
- src_size = H5Tget_size(src_id);
+
+ /*#####################################################################*/
if(iod_oh.cookie == (int)IOD_OH_UNDEFINED) {
if (iod_obj_open_write(coh, iod_id, NULL /*hints*/, &iod_oh, NULL) < 0)
HGOTO_ERROR(H5E_SYM, H5E_CANTINIT, FAIL, "can't open current group");
opened_locally = TRUE;
}
+
/* get the rank of the dataspace */
if((ndims = H5Sget_simple_extent_ndims(space_id)) < 0)
HGOTO_ERROR(H5E_INTERNAL, H5E_CANTGET, FAIL, "unable to get dataspace dimesnsion");
@@ -895,12 +892,14 @@ int H5VL_iod_server_compactor_write (void *_list, int num_requests)
I/O operations needed */
if(H5VL_iod_get_file_desc(space_id, &num_descriptors, NULL) < 0)
HGOTO_ERROR(H5E_DATASPACE, H5E_CANTGET, FAIL, "unable to generate IOD file descriptor from dataspace selection");
+ /*#####################################################################*/
#if DEBUG_COMPACTOR
fprintf (stderr,"COMPACTOR num_descriptors : %llu, ndims: %d\n",
num_descriptors,
ndims);
#endif
+
/* allocate the IOD hyperslab descriptors needed */
if(NULL == (hslabs = (iod_hyperslab_t *)malloc
(sizeof(iod_hyperslab_t) * (size_t)num_descriptors)))
@@ -955,7 +954,6 @@ int H5VL_iod_server_compactor_write (void *_list, int num_requests)
#endif
curr_j = 0;
-
for(n=0 ; n<num_descriptors ; n++) {
num_bytes = 0;
@@ -966,39 +964,36 @@ int H5VL_iod_server_compactor_write (void *_list, int num_requests)
num_elems *= (hslabs[n].count[i] * hslabs[n].block[i]);
num_bytes = num_elems * dst_size;
-
/*new memory descriptor for this hslab descriptor*/
if (mem_desc != NULL)
mem_desc = NULL;
-
mem_desc = (iod_mem_desc_t *) malloc (sizeof(*mem_desc) + sizeof(mem_desc->frag[0]));
if (NULL == mem_desc){
HGOTO_ERROR(H5E_SYM, H5E_NOSPACE, FAIL,"Can't allocate mem_desc \n");
}
if (list[request_counter].merged == NOT_MERGED){
-
- buf_ptr = (uint8_t *)buf;
-
+
+#if DEBUG_COMPACTOR
+ ptr = (int *) buf;
+ fprintf(stderr,"COMPACTOR WRITE: Received a buffer of size %zd with values :",
+ size);
+ for(i=0;i< (int)size/4;++i){
+ fprintf(stderr, "COMPACTOR WRITE: buf[%d] : %d\n",
+ i, ptr[i]);
+ }
+ fprintf(stderr, "\n");
+#endif
/* set the memory descriptor */
mem_desc->nfrag = 1;
mem_desc->frag[0].addr = (void *)buf_ptr;
mem_desc->frag[0].len = (iod_size_t)num_bytes;
-
file_desc = hslabs[n];
-
-#if H5_DO_NATIVE
- write_buf = (char *)buf;
-#endif
buf_ptr += num_bytes;
-
}
if (list[request_counter].merged == MERGED){
-
-
-
#if DEBUG_COMPACTOR
fprintf (stderr, "COMPACTOR WRITE i: %d, num_mblocks: %zd, num_bytes: %lli, start_reqs: %lli\n",
request_counter, list[request_counter].num_mblocks, num_bytes, start_reqs);
@@ -1007,7 +1002,7 @@ int H5VL_iod_server_compactor_write (void *_list, int num_requests)
j, list[request_counter].mblocks[j].len);
}
#endif
-
+
#if H5_DO_NATIVE
write_buf_ptr = write_buf;
#endif
@@ -1040,13 +1035,13 @@ int H5VL_iod_server_compactor_write (void *_list, int num_requests)
}
}
}
-
curr_j += 1;
/*Determined the number of entries in the memory block for this hslab
descriptor*/
tmp = (iod_mem_desc_t *) realloc
(mem_desc, sizeof(*tmp) + (size_t)(mem_reqs * sizeof(mem_desc->frag[0])));
+
if (tmp){
mem_desc = tmp;
mem_desc->nfrag = mem_reqs;
@@ -1054,10 +1049,7 @@ int H5VL_iod_server_compactor_write (void *_list, int num_requests)
}
k = 0;
-
-
for ( j = start_reqs; j < (start_reqs + mem_reqs); j++){
-
if ((j == curr_j) && (bytes_left)){
mem_desc->frag[k].addr = (void *)(uintptr_t)(curr_offset);
mem_desc->frag[k].len = bytes_left;
@@ -1078,8 +1070,7 @@ int H5VL_iod_server_compactor_write (void *_list, int num_requests)
k++;
}
curr_k += k;
-
-
+
#if H5_DO_NATIVE
fprintf (stderr, "Printing the native constructed buffer of length :%zd\n",
native_length);
@@ -1090,8 +1081,6 @@ int H5VL_iod_server_compactor_write (void *_list, int num_requests)
fprintf(stderr, "\n");
#endif
-
-
#if DEBUG_COMPACTOR
for ( j = 0 ; j < k; j++){
ptr = (int *)(mem_desc->frag[j].addr);
@@ -1104,6 +1093,7 @@ int H5VL_iod_server_compactor_write (void *_list, int num_requests)
fflush(stderr);
#endif
}
+
file_desc = hslabs[n];
#if H5VL_IOD_DEBUG
@@ -1113,27 +1103,27 @@ int H5VL_iod_server_compactor_write (void *_list, int num_requests)
(size_t)file_desc.block[i], (size_t)file_desc.count[i]);
}
#endif
-
-
io_array[n].oh = iod_oh;
io_array[n].hints = NULL;
io_array[n].mem_desc = mem_desc;
io_array[n].io_desc = &file_desc;
io_array[n].cs = &cs_list[n];
io_array[n].ret = &ret_list[n];
-
}
-
+#if (!H5_DO_NATIVE)
if(iod_array_write_list(coh, IOD_TID_UNKNOWN, (iod_size_t)num_descriptors,
io_array, NULL) < 0)
HGOTO_ERROR(H5E_SYM, H5E_READERROR, FAIL, "can't write array object");
-
+#endif
#if H5_DO_NATIVE
- native_dims = (hsize_t *) malloc (sizeof(hsize_t));
- *native_dims = (hsize_t)(total_length/src_size);
- /*We constructed a contiguous buffer here so the selection */
+
+
+#if DEBUG_COMPACTOR
+ fprintf(stderr,"native_dims : %lli\n", *native_dims);
+#endif
+
mem_dataspace = H5Screate_simple(1, native_dims, NULL);
ret_value = H5Dwrite(iod_oh.cookie,dst_id, mem_dataspace, space_id, H5P_DEFAULT,
write_buf);
@@ -1144,7 +1134,6 @@ int H5VL_iod_server_compactor_write (void *_list, int num_requests)
}
if (NULL != native_dims)
free(native_dims);
-
#endif
if (NULL != mem_desc){