diff options
author | Vishwanath Venkatesan <vish@hdfgroup.org> | 2013-07-12 23:49:55 (GMT) |
---|---|---|
committer | Vishwanath Venkatesan <vish@hdfgroup.org> | 2013-07-12 23:49:55 (GMT) |
commit | fb13bdbd0b91fd07665b60ded99d3d187f6b0a2d (patch) | |
tree | 92bf8cc69ae25fa53e5cd45f1f3c004c496313f5 | |
parent | a8a349b0dcd299f37d3d94d93d688979b137c319 (diff) | |
download | hdf5-fb13bdbd0b91fd07665b60ded99d3d187f6b0a2d.zip hdf5-fb13bdbd0b91fd07665b60ded99d3d187f6b0a2d.tar.gz hdf5-fb13bdbd0b91fd07665b60ded99d3d187f6b0a2d.tar.bz2 |
[svn-r23894] End of day commit.
-rw-r--r-- | src/H5VLiod_compactor.c | 25 | ||||
-rw-r--r-- | src/H5VLiod_dset.c | 264 |
2 files changed, 192 insertions, 97 deletions
diff --git a/src/H5VLiod_compactor.c b/src/H5VLiod_compactor.c index 86c017d..91245ad 100644 --- a/src/H5VLiod_compactor.c +++ b/src/H5VLiod_compactor.c @@ -745,6 +745,7 @@ int H5VL_iod_compact_requests (request_list_t *list, int *total_requests, if (!merge_flag){ merge_flag = 1; } + list[request_list[i]].merged = USED_IN_MERGING; last_merged = res_dataspace; /*update the last merged!*/ } else{ @@ -837,6 +838,14 @@ int H5VL_iod_compact_requests (request_list_t *list, int *total_requests, free(lselected_req); lselected_req = NULL; } + for (i = 0; i < original_requests; i ++){ + fprintf(stderr,"%s:%d, Request :%d --> %d\n", + __FILE__, + __LINE__, + i, + list[i].merged); + } + *total_requests = original_requests; done: @@ -874,7 +883,7 @@ int H5VL_iod_construct_merged_request (request_list_t *list, size_t g_entries = 0, j, m_entries = 0; size_t *glens = NULL, blck_cnt = 0, *mlen = NULL; block_container_t *sf_block = NULL, *sm_block = NULL; - int ret_value = CP_SUCCESS; + int ret_value = CP_SUCCESS, set_op_data = 0; FUNC_ENTER_NOAPI(NULL) @@ -890,6 +899,8 @@ int H5VL_iod_construct_merged_request (request_list_t *list, } #endif + + fblks = H5VL_iod_get_selected_fblocks_count(lselected_req, num_selected, list); @@ -917,7 +928,10 @@ int H5VL_iod_construct_merged_request (request_list_t *list, for ( i = 0; i < num_requests; i++){ if (CP_SUCCESS == H5VL_iod_request_exist(request_list[i], lselected_req, num_selected)){ - + if (!set_op_data){ + merged_request->op_data = list[request_list[i]].op_data; + set_op_data; + } for (j = 0; j < list[request_list[i]].num_fblocks; j++){ sf_block[blck_cnt].offset = list[request_list[i]].fblocks[j].offset; sf_block[blck_cnt].len = list[request_list[i]].fblocks[j].len; @@ -1021,6 +1035,8 @@ int H5VL_iod_construct_merged_request (request_list_t *list, merged_request->fblocks[j].offset = goffsets[j]; merged_request->fblocks[j].len = glens[j]; } + merged_request->num_fblocks = m_entries; + /* Then compare that with automatically generated offsets (the offsets have to match) */ @@ -1044,7 +1060,10 @@ int H5VL_iod_construct_merged_request (request_list_t *list, merged_request->mblocks[j].len); #endif } - } /*if num_selected > 1*/ + merged_request->num_mblocks = mblks; + } + + /*if num_selected > 1*/ else{ #if DEBUG_COMPACTOR fprintf(stderr,"in %s:%d Should not be here with num_selected :%d \n", diff --git a/src/H5VLiod_dset.c b/src/H5VLiod_dset.c index dc56cd0..215ebbd 100644 --- a/src/H5VLiod_dset.c +++ b/src/H5VLiod_dset.c @@ -1,4 +1,4 @@ - /* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * +/* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * Copyright by The HDF Group. * * Copyright by the Board of Trustees of the University of Illinois. * * All rights reserved. * @@ -626,6 +626,7 @@ FUNC_LEAVE_NOAPI_VOID } /* end H5VL_iod_server_dset_read_cb() */ + /*------------------------------------------------------------------------- * Function: H5VL_iod_server_dset_compactor_cb * @@ -668,9 +669,7 @@ compactor_queue_flag = 0; curr_queue = NULL; pthread_mutex_unlock(&lock); - - - + ret_value = H5VL_iod_create_request_list (cqueue, &wlist, &nentries, @@ -685,7 +684,7 @@ } #endif -#if 0 +#if 1 for ( i = 0; i < ndatasets; i ++){ H5VL_iod_compact_requests (wlist, &nentries,dlist[i].num_requests, dlist[i].requests); @@ -730,9 +729,7 @@ *------------------------------------------------------------------------- */ - -int -H5VL_iod_server_compactor_write (void *_list, int num_requests) +int H5VL_iod_server_compactor_write (void *_list, int num_requests) { @@ -748,15 +745,16 @@ H5VL_iod_server_compactor_write (void *_list, int num_requests) hg_bulk_t bulk_handle; hid_t space_id, dst_id; size_t size, dst_size; - void *buf; + void *buf = NULL; uint8_t *buf_ptr; hssize_t num_descriptors = 0, n =0; hbool_t opened_locally = FALSE; iod_hyperslab_t *hslabs = NULL; - iod_mem_desc_t mem_desc; + iod_mem_desc_t *mem_desc = NULL; iod_array_iodesc_t file_desc; - hsize_t num_bytes = 0; - hsize_t num_elems = 0; + hsize_t num_bytes = 0, mem_reqs = 0, start_reqs = 0; + hsize_t num_elems = 0, k = 0, j=0; + hsize_t curr_j = 0, bytes_left = 0; FUNC_ENTER_NOAPI_NOINIT @@ -773,62 +771,82 @@ H5VL_iod_server_compactor_write (void *_list, int num_requests) #endif for (request_counter = 0; request_counter < num_requests; request_counter++){ - op_data = (op_data_t *)list[request_counter].op_data; - input = (dset_io_in_t *)op_data->input; - iod_oh = input->iod_oh; - iod_id = input->iod_id; - bulk_handle = input->bulk_handle; - space_id = list[request_counter].selection_id; - dst_id = input->dset_type_id; - dst_size = H5Tget_size(dst_id); - - if((ndims = H5Sget_simple_extent_ndims(space_id)) < 0) - HGOTO_ERROR(H5E_INTERNAL, H5E_CANTGET, FAIL, "unable to get dataspace dimesnsion"); - - fprintf (stderr,"space_id: %d, selection_id: %d, ndims : %d\n", - space_id, list[request_counter].selection_id, ndims); - - if(iod_oh.cookie == IOD_OH_UNDEFINED) { - if (iod_obj_open_write(coh, iod_id, NULL /*hints*/, &iod_oh, NULL) < 0) - HGOTO_ERROR(H5E_SYM, H5E_CANTINIT, FAIL, "can't open current group"); - opened_locally = TRUE; - } - - if (list[request_counter].merged == NOT_MERGED){ - /*Even in the case its not merged the buffer was already extracted*/ - fprintf (stderr,"COMPACTOR WRITE: Request %d has not been merged \n", request_counter); - size = list[request_counter].mem_length; - buf = (void *)list[request_counter].mem_buf; - } - + if (list[request_counter].merged != USED_IN_MERGING){ + + /*When the request was used in merging we have to just + manage the handle*/ + if (list[request_counter].merged == NOT_MERGED){ + op_data = (op_data_t *)list[request_counter].op_data; + input = (dset_io_in_t *)op_data->input; + iod_oh = input->iod_oh; + iod_id = input->iod_id; + bulk_handle = input->bulk_handle; + space_id = list[request_counter].selection_id; + dst_id = input->dset_type_id; + dst_size = H5Tget_size(dst_id); + + fprintf (stderr,"space_id: %d, selection_id: %d\n", + space_id, list[request_counter].selection_id); + + /*Even in the case its not merged the buffer was already extracted*/ + fprintf (stderr,"COMPACTOR WRITE: Request %d has not been merged \n", request_counter+1); + size = list[request_counter].mem_length; + buf = (void *)list[request_counter].mem_buf; #if 1 - ptr = (int *) buf; - - fprintf(stderr,"COMPACTOR WRITE: Received a buffer or size %zd with values :", - size); - for(i=0;i<size/4;++i){ - fprintf(stderr, "COMPACTOR WRITE: buf[%d] : %d\n", - i, ptr[i]); - } - fprintf(stderr, "\n"); + ptr = (int *) buf; + fprintf(stderr,"COMPACTOR WRITE: Received a buffer of size %zd with values :", + size); + for(i=0;i<size/4;++i){ + fprintf(stderr, "COMPACTOR WRITE: buf[%d] : %d\n", + i, ptr[i]); + } + fprintf(stderr, "\n"); #endif + } - /* get the rank of the dataspace */ - - - /* get the number of decriptors required, i.e. the numbers of iod + if (list[request_counter].merged == MERGED){ + op_data = (op_data_t *)list[request_counter].op_data; + input = (dset_io_in_t *)op_data->input; + iod_oh = input->iod_oh; + iod_id = input->iod_id; + space_id = list[request_counter].selection_id; + dst_id = input->dset_type_id; + dst_size = H5Tget_size(dst_id); + + fprintf (stderr,"space_id: %d, selection_id: %d\n", + space_id, list[request_counter].selection_id); + + /*Even in the case its not merged the buffer was already extracted*/ + fprintf (stderr,"COMPACTOR WRITE: Request %d has been merged \n", request_counter+1); + } + + + if(iod_oh.cookie == IOD_OH_UNDEFINED) { + if (iod_obj_open_write(coh, iod_id, NULL /*hints*/, &iod_oh, NULL) < 0) + HGOTO_ERROR(H5E_SYM, H5E_CANTINIT, FAIL, "can't open current group"); + opened_locally = TRUE; + } + + + + /* get the rank of the dataspace */ + if((ndims = H5Sget_simple_extent_ndims(space_id)) < 0) + HGOTO_ERROR(H5E_INTERNAL, H5E_CANTGET, FAIL, "unable to get dataspace dimesnsion"); + + /* get the number of decriptors required, i.e. the numbers of iod I/O operations needed */ - if(H5VL_iod_get_file_desc(space_id, &num_descriptors, NULL) < 0) - HGOTO_ERROR(H5E_DATASPACE, H5E_CANTGET, FAIL, "unable to generate IOD file descriptor from dataspace selection"); + if(H5VL_iod_get_file_desc(space_id, &num_descriptors, NULL) < 0) + HGOTO_ERROR(H5E_DATASPACE, H5E_CANTGET, FAIL, "unable to generate IOD file descriptor from dataspace selection"); + + fprintf (stderr,"num_descriptors : %llu, ndims: %d\n", + num_descriptors, + ndims); + + /* allocate the IOD hyperslab descriptors needed */ + if(NULL == (hslabs = (iod_hyperslab_t *)malloc + (sizeof(iod_hyperslab_t) * (size_t)num_descriptors))) + HGOTO_ERROR(H5E_SYM, H5E_NOSPACE, FAIL, "can't allocate iod array descriptors"); - fprintf (stderr,"num_descriptors : %llu, ndims: %d\n", - num_descriptors, - ndims); - /* allocate the IOD hyperslab descriptors needed */ - if(NULL == (hslabs = (iod_hyperslab_t *)malloc - (sizeof(iod_hyperslab_t) * (size_t)num_descriptors))) - HGOTO_ERROR(H5E_SYM, H5E_NOSPACE, FAIL, "can't allocate iod array descriptors"); - for (n = 0; n < num_descriptors; n++){ hslabs[n].start = (iod_size_t *)malloc(sizeof(iod_size_t) * ndims); hslabs[n].stride = (iod_size_t *)malloc(sizeof(iod_size_t) * ndims); @@ -839,14 +857,15 @@ H5VL_iod_server_compactor_write (void *_list, int num_requests) /* generate the descriptors after allocating the array */ if(H5VL_iod_get_file_desc(space_id, &num_descriptors, hslabs) < 0) HGOTO_ERROR(H5E_DATASPACE, H5E_CANTGET, FAIL, "unable to generate IOD file descriptor from dataspace selection"); - - buf_ptr = (uint8_t *)buf; + curr_j = 0; for(n=0 ; n<num_descriptors ; n++) { + + fprintf (stderr, + "Enters for num_descriptors : %llu, n : %llu\n", + num_descriptors, n); - fprintf (stderr, "Enters for num_descriptors : %llu, n : %llu\n", num_descriptors, n); num_bytes = 0; num_elems = 0; - /* determine how many bytes the current descriptor holds */ for(i=0 ; i<ndims ; i++){ @@ -854,34 +873,89 @@ H5VL_iod_server_compactor_write (void *_list, int num_requests) num_elems *= (hslabs[n].count[i] * hslabs[n].block[i]); } num_bytes = num_elems * dst_size; -#if 0 - /* set the memory descriptor */ - mem_desc = (iod_mem_desc_t *) malloc (sizeof (iod_mem_desc_t)); - mem_desc->nfrag = 1; - mem_desc->frag[0].addr = (void *)buf_ptr; - mem_desc->frag[0].len = (iod_size_t)num_bytes; + + if (list[request_counter].merged == NOT_MERGED){ + buf_ptr = (uint8_t *)buf; +#if 1 + /* set the memory descriptor */ + mem_desc = (iod_mem_desc_t *) malloc (sizeof (iod_mem_desc_t)); + mem_desc->frag = (iod_mem_frag_t *) malloc (sizeof (iod_mem_desc_t)); + mem_desc->nfrag = 1; + mem_desc->frag[0].addr = (void *)buf_ptr; + mem_desc->frag[0].len = (iod_size_t)num_bytes; #endif - buf_ptr += num_bytes; - file_desc = hslabs[n]; + buf_ptr += num_bytes; #if H5VL_IOD_DEBUG - for(i=0 ; i<ndims ; i++) { - fprintf(stderr, "Dim %d: start %zu stride %zu block %zu count %zu\n", - i, (size_t)file_desc.start[i], (size_t)file_desc.stride[i], - (size_t)file_desc.block[i], (size_t)file_desc.count[i]); + for(i=0 ; i<ndims ; i++) { + fprintf(stderr, "Dim %d: start %zu stride %zu block %zu count %zu\n", + i, (size_t)file_desc.start[i], (size_t)file_desc.stride[i], + (size_t)file_desc.block[i], (size_t)file_desc.count[i]); + } +#endif + if (NULL != buf){ + free(buf); + } } -#endif - /* write from array object */ - /* TODO: VV Change checksum once that is fixed*/ - if(iod_array_write(iod_oh, IOD_TID_UNKNOWN, NULL, NULL /*This is where the memory descriptor goes*/, - &file_desc, NULL, NULL) < 0) - HGOTO_ERROR(H5E_DATASET, H5E_WRITEERROR, FAIL, "can't write to array object"); - } + if (list[request_counter].merged == MERGED){ + k = 0; + mem_reqs = 0; start_reqs = curr_j; + for (j = curr_j; j < list[i].num_mblocks; j++){ + if (k < num_bytes){ + if (bytes_left){ + k += bytes_left; + mem_reqs++; + continue; + } + else if (k + list[i].mblocks[j].len > num_bytes){ + bytes_left = list[i].mblocks[j].len - (num_bytes - k ); + curr_j = j; + mem_reqs++; + break; + } + else{ /* <= case*/ + k += list[i].mblocks[j].len; + curr_j = j; + mem_reqs++; + } + } + } + /*Determined the number of entries in the memory block for this hslab + descriptor*/ + mem_desc = (iod_mem_desc_t *) malloc (sizeof (iod_mem_desc_t)); + mem_desc->nfrag = mem_reqs; + mem_desc->frag = (iod_mem_frag_t *) malloc (mem_reqs * + sizeof (iod_mem_desc_t)); + k = 0; + for ( j = start_reqs; j < mem_reqs; j++){ + mem_desc->frag[k].addr = list[i].mblocks[j].offset; + mem_desc->frag[k].len = list[i].mblocks[j].len; + } + } + } + /* write from array object */ + /* TODO: VV Change checksum once that is fixed*/ + file_desc = hslabs[n]; + if(iod_array_write(iod_oh, IOD_TID_UNKNOWN, NULL, + mem_desc /*This is where the memory descriptor goes*/, + &file_desc, + NULL, NULL) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_WRITEERROR, FAIL, "can't write to array object"); + + if (NULL != mem_desc->frag){ + free(mem_desc->frag); + } + if (NULL != mem_desc){ + free(mem_desc); + mem_desc = NULL; + } + + } #if H5VL_IOD_DEBUG fprintf(stderr, "Done with dset write, sending %d response to client\n", ret_value); #endif - free(buf); + /* free allocated descriptors */ for(n=0 ; n<num_descriptors ; n++) { @@ -899,17 +973,19 @@ H5VL_iod_server_compactor_write (void *_list, int num_requests) HGOTO_ERROR(H5E_SYM, H5E_CANTINIT, FAIL, "can't close Array object"); } } + for (i = 0; i< num_requests; i++){ - op_data = (op_data_t *)list[i].op_data; - input = (dset_io_in_t *)op_data->input; - if(HG_SUCCESS != HG_Handler_start_output(op_data->hg_handle, &ret_value)) - HDONE_ERROR(H5E_SYM, H5E_WRITEERROR, FAIL, "can't send result of write to client"); - input = (dset_io_in_t *)H5MM_xfree(input); - op_data = (op_data_t *)H5MM_xfree(op_data); + if (list[i].merged != MERGED){ + op_data = (op_data_t *)list[i].op_data; + input = (dset_io_in_t *)op_data->input; + if(HG_SUCCESS != HG_Handler_start_output(op_data->hg_handle, &ret_value)) + HDONE_ERROR(H5E_SYM, H5E_WRITEERROR, FAIL, "can't send result of write to client"); + input = (dset_io_in_t *)H5MM_xfree(input); + op_data = (op_data_t *)H5MM_xfree(op_data); + } } done: - FUNC_LEAVE_NOAPI(ret_value); }/*H5VL_iod_server_compactor_write*/ |