diff options
author | Vishwanath Venkatesan <vish@hdfgroup.org> | 2013-07-31 22:25:31 (GMT) |
---|---|---|
committer | Vishwanath Venkatesan <vish@hdfgroup.org> | 2013-07-31 22:25:31 (GMT) |
commit | aa8503c35f047b3db20fd907a34789ee54568a7f (patch) | |
tree | 4bf145381fdf6c84adf2fedf8f5af02c2e215e1f | |
parent | f4cb87ec1a4d57bdaae89ac03f39aada49ba3a4f (diff) | |
download | hdf5-aa8503c35f047b3db20fd907a34789ee54568a7f.zip hdf5-aa8503c35f047b3db20fd907a34789ee54568a7f.tar.gz hdf5-aa8503c35f047b3db20fd907a34789ee54568a7f.tar.bz2 |
[svn-r23957] Code cleanup + Fixing issues for handling writing
from compactor for multiple clients.
-rw-r--r-- | src/H5VLiod_compactor.c | 36 | ||||
-rw-r--r-- | src/H5VLiod_compactor.h | 15 | ||||
-rw-r--r-- | src/H5VLiod_dset.c | 165 |
3 files changed, 120 insertions, 96 deletions
diff --git a/src/H5VLiod_compactor.c b/src/H5VLiod_compactor.c index 77eff0f..f6247ac 100644 --- a/src/H5VLiod_compactor.c +++ b/src/H5VLiod_compactor.c @@ -185,6 +185,10 @@ int H5VL_iod_create_request_list (compactor *queue, request_list_t **list, int j = 0, request_id = 0, i, current_dset_id = 0, lreq = 0, flag = 0; size_t ii; uint64_t axe_id; + +#if H5_DO_NATIVE + char dname[256], dname1[256]; +#endif @@ -297,9 +301,18 @@ int H5VL_iod_create_request_list (compactor *queue, request_list_t **list, num_datasets++; } - /*from the second id!*/else{ + /*from the second id!*/ + else{ +#if H5_DO_NATIVE + H5Iget_name (current_dset, dname, 257); + H5Iget_name (dataset_id, dname1, 257); + if ( (current_dset == (hid_t)dataset_id) || + (!(strcmp(dname, dname1))) ) { +#else if (current_dset == (hid_t)dataset_id){ +#endif + #if DEBUG_COMPACTOR fprintf(stderr, "in %s:%d current dataset: %d has the request %d at %d \n", __FILE__, __LINE__, @@ -372,6 +385,8 @@ int H5VL_iod_create_request_list (compactor *queue, request_list_t **list, } + + /*Allocation buffer and retrieving the buffer associated with the selection through the function shipper*/ if(NULL == (buf = malloc(buf_size))) @@ -2037,4 +2052,23 @@ int H5VL_iod_sort_block_container (block_container_t *io_array, } +int H5VL_iod_free_memory_buffer (request_list_t *list, int num_requests){ + + int ret_value = CP_SUCCESS; + int i; + + for (i = 0; i< num_requests; i++){ + + if (list[i].merged == USED_IN_MERGING){ + if (list[i].mem_buf){ + free(list[i].mem_buf); + list[i].mem_buf = NULL; + list[i].mem_length = 0; + } + } + } + + return ret_value; +} + #endif /* H5_HAVE_EFF*/ diff --git a/src/H5VLiod_compactor.h b/src/H5VLiod_compactor.h index 042abac..355d537 100644 --- a/src/H5VLiod_compactor.h +++ b/src/H5VLiod_compactor.h @@ -112,13 +112,14 @@ H5_DLL int H5VL_iod_construct_merged_request (request_list_t *list, int *request_list, int num_requests); H5_DLL int H5VL_iod_reconstruct_overlapped_request (block_container_t *sf_block, - block_container_t *sm_block, - block_container_t **out_sf_block, - block_container_t **out_sm_block, - size_t *revfblks, size_t *revmblks, - size_t i, size_t j, - int *changed, int *changed_cnt); - + block_container_t *sm_block, + block_container_t **out_sf_block, + block_container_t **out_sm_block, + size_t *revfblks, size_t *revmblks, + size_t i, size_t j, + int *changed, int *changed_cnt); + +H5_DLL int H5VL_iod_free_memory_buffer (request_list_t *list, int num_requests); /*---------------------------------------------------------------------------------------- */ diff --git a/src/H5VLiod_dset.c b/src/H5VLiod_dset.c index 6710d9d..35f022f 100644 --- a/src/H5VLiod_dset.c +++ b/src/H5VLiod_dset.c @@ -659,7 +659,9 @@ done: fprintf(stderr, " COMPACTOR CB: Sleeping\n"); fflush(stderr); #endif - usleep(1000); +#if H5_DO_NATIVE + sleep(1); +#endif @@ -720,30 +722,30 @@ done: Once that is done --> execute the func : H5VL_iod_server_send_result This sends results to all the clients about their completion. - - There is a flaw in the IOD construction of io_array_t datastructure. - They seem to have mem_desc and io_desc as pointers and there is no-way to specify - the number of descriptors!.. We need that to free the memory descriptors. - For now we can leave it at this! - */ - if (NULL != wlist){ - free(wlist); - wlist = NULL; - } - if (NULL != rlist){ - free(rlist); - wlist = NULL; - } - if (NULL != dlist){ - free(dlist); - dlist = NULL; - } + */ + + if (CP_SUCCESS != H5VL_iod_free_memory_buffer (wlist, nentries)){ + HGOTO_ERROR(H5E_SYM, H5E_WRITEERROR, CP_FAIL, "Failed while freeing memory buffer\n"); + } + + if (NULL != wlist){ + free(wlist); + wlist = NULL; + } + if (NULL != rlist){ + free(rlist); + wlist = NULL; + } + if (NULL != dlist){ + free(dlist); + dlist = NULL; + } if (NULL != drlist){ free(drlist); drlist = NULL; } - + if (CP_SUCCESS != H5VL_iod_destroy_compactor_queue(cqueue)){ HGOTO_ERROR(H5E_HEAP, H5E_NOSPACE, CP_FAIL, "Cannot free NULL queue\n"); @@ -796,6 +798,7 @@ int H5VL_iod_server_compactor_write (void *_list, int num_requests) iod_array_io_t *io_array = NULL; /* arary for list I/O */ iod_checksum_t *cs_list = NULL; iod_ret_t *ret_list = NULL; + #if H5_DO_NATIVE size_t native_length = 0, total_length = 0; char *write_buf = NULL, *write_buf_ptr = NULL; @@ -822,71 +825,65 @@ int H5VL_iod_server_compactor_write (void *_list, int num_requests) for (request_counter = 0; request_counter < num_requests; request_counter++){ if (list[request_counter].merged != USED_IN_MERGING){ +#if H5_DO_NATIVE + native_dims = (hsize_t *) malloc (sizeof(hsize_t)); +#endif + + op_data = (op_data_t *)list[request_counter].op_data; + input = (dset_io_in_t *)op_data->input; + coh = input->coh; + iod_oh = input->iod_oh; + iod_id = input->iod_id; + bulk_handle = input->bulk_handle; + space_id = list[request_counter].selection_id; + dst_id = input->dset_type_id; + src_id = input->mem_type_id; + dst_size = H5Tget_size(dst_id); + src_size = H5Tget_size(src_id); /*When the request was used in merging we have to just manage the handle*/ if (list[request_counter].merged == NOT_MERGED){ - op_data = (op_data_t *)list[request_counter].op_data; - input = (dset_io_in_t *)op_data->input; - coh = input->coh; - iod_oh = input->iod_oh; - iod_id = input->iod_id; - bulk_handle = input->bulk_handle; - space_id = list[request_counter].selection_id; - dst_id = input->dset_type_id; - fprintf (stderr,"COMPACTOR space_id: %d, selection_id: %d\n", space_id, list[request_counter].selection_id); /*Even in the case its not merged the buffer was already extracted*/ - fprintf (stderr,"COMPACTOR WRITE: Request %d has not been merged \n", request_counter+1); size = list[request_counter].mem_length; + fprintf (stderr,"COMPACTOR WRITE: Request %d has not been merged\n", + request_counter+1, size, src_size); buf = (void *)list[request_counter].mem_buf; - -#if DEBUG_COMPACTOR - ptr = (int *) buf; - fprintf(stderr,"COMPACTOR WRITE: Received a buffer of size %zd with values :", - size); - for(i=0;i< (int)size/4;++i){ - fprintf(stderr, "COMPACTOR WRITE: buf[%d] : %d\n", - i, ptr[i]); - } - fprintf(stderr, "\n"); + +#if H5_DO_NATIVE + *native_dims = (hsize_t)(size/src_size); + write_buf = (char *)buf; #endif + buf_ptr = (uint8_t *)buf; } + if (list[request_counter].merged == MERGED){ - op_data = (op_data_t *)list[request_counter].op_data; - input = (dset_io_in_t *)op_data->input; - coh = input->coh; - iod_oh = input->iod_oh; - iod_id = input->iod_id; - space_id = list[request_counter].selection_id; - dst_id = input->dset_type_id; - src_id = input->dset_type_id; - #if H5_DO_NATIVE for (hi = 0; hi < list[request_counter].num_mblocks; hi++){ total_length += list[request_counter].mblocks[hi].len; } - fprintf (stderr,"total_length: %zd\n", total_length); + *native_dims = (hsize_t)(total_length/src_size); #endif #if DEBUG_COMPACTOR - fprintf (stderr,"space_id: %d, selection_id: %d\n", - space_id, list[request_counter].selection_id); /*Even in the case its not merged the buffer was already extracted*/ - fprintf (stderr,"COMPACTOR WRITE: Request %d has been merged \n", request_counter+1); + fprintf (stderr,"COMPACTOR WRITE: Request %d has been merged \n", + request_counter+1); #endif } - dst_size = H5Tget_size(dst_id); - src_size = H5Tget_size(src_id); + + /*#####################################################################*/ if(iod_oh.cookie == (int)IOD_OH_UNDEFINED) { if (iod_obj_open_write(coh, iod_id, NULL /*hints*/, &iod_oh, NULL) < 0) HGOTO_ERROR(H5E_SYM, H5E_CANTINIT, FAIL, "can't open current group"); opened_locally = TRUE; } + /* get the rank of the dataspace */ if((ndims = H5Sget_simple_extent_ndims(space_id)) < 0) HGOTO_ERROR(H5E_INTERNAL, H5E_CANTGET, FAIL, "unable to get dataspace dimesnsion"); @@ -895,12 +892,14 @@ int H5VL_iod_server_compactor_write (void *_list, int num_requests) I/O operations needed */ if(H5VL_iod_get_file_desc(space_id, &num_descriptors, NULL) < 0) HGOTO_ERROR(H5E_DATASPACE, H5E_CANTGET, FAIL, "unable to generate IOD file descriptor from dataspace selection"); + /*#####################################################################*/ #if DEBUG_COMPACTOR fprintf (stderr,"COMPACTOR num_descriptors : %llu, ndims: %d\n", num_descriptors, ndims); #endif + /* allocate the IOD hyperslab descriptors needed */ if(NULL == (hslabs = (iod_hyperslab_t *)malloc (sizeof(iod_hyperslab_t) * (size_t)num_descriptors))) @@ -955,7 +954,6 @@ int H5VL_iod_server_compactor_write (void *_list, int num_requests) #endif curr_j = 0; - for(n=0 ; n<num_descriptors ; n++) { num_bytes = 0; @@ -966,39 +964,36 @@ int H5VL_iod_server_compactor_write (void *_list, int num_requests) num_elems *= (hslabs[n].count[i] * hslabs[n].block[i]); num_bytes = num_elems * dst_size; - /*new memory descriptor for this hslab descriptor*/ if (mem_desc != NULL) mem_desc = NULL; - mem_desc = (iod_mem_desc_t *) malloc (sizeof(*mem_desc) + sizeof(mem_desc->frag[0])); if (NULL == mem_desc){ HGOTO_ERROR(H5E_SYM, H5E_NOSPACE, FAIL,"Can't allocate mem_desc \n"); } if (list[request_counter].merged == NOT_MERGED){ - - buf_ptr = (uint8_t *)buf; - + +#if DEBUG_COMPACTOR + ptr = (int *) buf; + fprintf(stderr,"COMPACTOR WRITE: Received a buffer of size %zd with values :", + size); + for(i=0;i< (int)size/4;++i){ + fprintf(stderr, "COMPACTOR WRITE: buf[%d] : %d\n", + i, ptr[i]); + } + fprintf(stderr, "\n"); +#endif /* set the memory descriptor */ mem_desc->nfrag = 1; mem_desc->frag[0].addr = (void *)buf_ptr; mem_desc->frag[0].len = (iod_size_t)num_bytes; - file_desc = hslabs[n]; - -#if H5_DO_NATIVE - write_buf = (char *)buf; -#endif buf_ptr += num_bytes; - } if (list[request_counter].merged == MERGED){ - - - #if DEBUG_COMPACTOR fprintf (stderr, "COMPACTOR WRITE i: %d, num_mblocks: %zd, num_bytes: %lli, start_reqs: %lli\n", request_counter, list[request_counter].num_mblocks, num_bytes, start_reqs); @@ -1007,7 +1002,7 @@ int H5VL_iod_server_compactor_write (void *_list, int num_requests) j, list[request_counter].mblocks[j].len); } #endif - + #if H5_DO_NATIVE write_buf_ptr = write_buf; #endif @@ -1040,13 +1035,13 @@ int H5VL_iod_server_compactor_write (void *_list, int num_requests) } } } - curr_j += 1; /*Determined the number of entries in the memory block for this hslab descriptor*/ tmp = (iod_mem_desc_t *) realloc (mem_desc, sizeof(*tmp) + (size_t)(mem_reqs * sizeof(mem_desc->frag[0]))); + if (tmp){ mem_desc = tmp; mem_desc->nfrag = mem_reqs; @@ -1054,10 +1049,7 @@ int H5VL_iod_server_compactor_write (void *_list, int num_requests) } k = 0; - - for ( j = start_reqs; j < (start_reqs + mem_reqs); j++){ - if ((j == curr_j) && (bytes_left)){ mem_desc->frag[k].addr = (void *)(uintptr_t)(curr_offset); mem_desc->frag[k].len = bytes_left; @@ -1078,8 +1070,7 @@ int H5VL_iod_server_compactor_write (void *_list, int num_requests) k++; } curr_k += k; - - + #if H5_DO_NATIVE fprintf (stderr, "Printing the native constructed buffer of length :%zd\n", native_length); @@ -1090,8 +1081,6 @@ int H5VL_iod_server_compactor_write (void *_list, int num_requests) fprintf(stderr, "\n"); #endif - - #if DEBUG_COMPACTOR for ( j = 0 ; j < k; j++){ ptr = (int *)(mem_desc->frag[j].addr); @@ -1104,6 +1093,7 @@ int H5VL_iod_server_compactor_write (void *_list, int num_requests) fflush(stderr); #endif } + file_desc = hslabs[n]; #if H5VL_IOD_DEBUG @@ -1113,27 +1103,27 @@ int H5VL_iod_server_compactor_write (void *_list, int num_requests) (size_t)file_desc.block[i], (size_t)file_desc.count[i]); } #endif - - io_array[n].oh = iod_oh; io_array[n].hints = NULL; io_array[n].mem_desc = mem_desc; io_array[n].io_desc = &file_desc; io_array[n].cs = &cs_list[n]; io_array[n].ret = &ret_list[n]; - } - +#if (!H5_DO_NATIVE) if(iod_array_write_list(coh, IOD_TID_UNKNOWN, (iod_size_t)num_descriptors, io_array, NULL) < 0) HGOTO_ERROR(H5E_SYM, H5E_READERROR, FAIL, "can't write array object"); - +#endif #if H5_DO_NATIVE - native_dims = (hsize_t *) malloc (sizeof(hsize_t)); - *native_dims = (hsize_t)(total_length/src_size); - /*We constructed a contiguous buffer here so the selection */ + + +#if DEBUG_COMPACTOR + fprintf(stderr,"native_dims : %lli\n", *native_dims); +#endif + mem_dataspace = H5Screate_simple(1, native_dims, NULL); ret_value = H5Dwrite(iod_oh.cookie,dst_id, mem_dataspace, space_id, H5P_DEFAULT, write_buf); @@ -1144,7 +1134,6 @@ int H5VL_iod_server_compactor_write (void *_list, int num_requests) } if (NULL != native_dims) free(native_dims); - #endif if (NULL != mem_desc){ |