diff options
author | Vishwanath Venkatesan <vish@hdfgroup.org> | 2013-07-25 23:14:46 (GMT) |
---|---|---|
committer | Vishwanath Venkatesan <vish@hdfgroup.org> | 2013-07-25 23:14:46 (GMT) |
commit | 0721ed12752da7dc22044236da176b5902c83e54 (patch) | |
tree | f81cc07d438f329332b6e25a7ad56387c54bd6a9 | |
parent | 315e2305153d0b67dd78e119db694accfcce85a3 (diff) | |
download | hdf5-0721ed12752da7dc22044236da176b5902c83e54.zip hdf5-0721ed12752da7dc22044236da176b5902c83e54.tar.gz hdf5-0721ed12752da7dc22044236da176b5902c83e54.tar.bz2 |
[svn-r23934] Merging multiple datasets for IOD. Logic in place -- implemented
-rw-r--r-- | src/H5VLiod_dset.c | 184 | ||||
-rw-r--r-- | src/H5VLiod_server.h | 8 |
2 files changed, 137 insertions, 55 deletions
diff --git a/src/H5VLiod_dset.c b/src/H5VLiod_dset.c index 2d88976..67f58e1 100644 --- a/src/H5VLiod_dset.c +++ b/src/H5VLiod_dset.c @@ -627,7 +627,7 @@ H5VL_iod_server_dset_read_cb(AXE_engine_t UNUSED axe_engine, request_list_t *wlist=NULL; dataset_container_t *dlist=NULL; int nentries = 0, ndatasets = 0; - + iod_array_io_t *array_write = NULL; FUNC_ENTER_NOAPI_NOINIT #if DEBUG_COMPACTOR @@ -640,7 +640,7 @@ H5VL_iod_server_dset_read_cb(AXE_engine_t UNUSED axe_engine, usleep(1000); - + while(pthread_mutex_trylock(&lock) != 0); compactor_queue_flag = 0; curr_queue = NULL; @@ -659,20 +659,41 @@ H5VL_iod_server_dset_read_cb(AXE_engine_t UNUSED axe_engine, ret_value); } #endif - -#if 1 + + array_write = (iod_array_io_t *) malloc (ndatasets * + sizeof (iod_array_io_t)); + if (NULL == array_write){ + HGOTO_ERROR(H5E_SYM, H5E_NOSPACE, FAIL, "No space to allocate iod_array"); + } + for ( i = 0; i < ndatasets; i ++){ H5VL_iod_compact_requests (wlist, &nentries,dlist[i].num_requests, dlist[i].requests); - } -#endif - if (CP_SUCCESS != H5VL_iod_server_compactor_write (wlist, nentries)){ + if (CP_SUCCESS != H5VL_iod_server_compactor_write (wlist, nentries, + &array_write[i])){ #if DEBUG_COMPACTOR - fprintf (stderr,"COMPACTOR CB: compactor write failed \n"); + fprintf (stderr,"COMPACTOR CB: compactor write failed \n"); #endif - HGOTO_ERROR(H5E_SYM, H5E_WRITEERROR, CP_FAIL, "Lower lever write failed\n"); - } + HGOTO_ERROR(H5E_SYM, H5E_WRITEERROR, CP_FAIL, "Lower lever write failed\n"); + } + } + /*Call here iod_array_write_list + array_write array has been constructed. + The COH value can come from any of the list requests + Once that is done --> execute the func : H5VL_iod_server_send_result + This sends results to all the clients about their completion. + There is a flaw in the IOD construction of io_array_t datastructure. + They seem to have mem_desc and io_desc as pointers and there is no-way to specify + the number of descriptors!.. We need that to free the memory descriptors. + For now we can leave it at this! + */ + + + if (NULL != array_write){ + free(array_write); + array_write = NULL; + } if (NULL != wlist){ free(wlist); wlist = NULL; @@ -705,7 +726,8 @@ H5VL_iod_server_dset_read_cb(AXE_engine_t UNUSED axe_engine, *------------------------------------------------------------------------- */ -int H5VL_iod_server_compactor_write (void *_list, int num_requests) +int H5VL_iod_server_compactor_write (void *_list, int num_requests, + iod_array_io_t *larray) { int ret_value = CP_SUCCESS; @@ -726,11 +748,12 @@ int H5VL_iod_server_compactor_write (void *_list, int num_requests) hbool_t opened_locally = FALSE; iod_hyperslab_t *hslabs = NULL; iod_mem_desc_t *mem_desc = NULL; - iod_array_iodesc_t file_desc; + iod_array_iodesc_t *file_desc=NULL; hsize_t num_bytes = 0, mem_reqs = 0, start_reqs = 0; hsize_t num_elems = 0, k = 0, j=0, curr_k = 0; hsize_t curr_j = 0, bytes_left = 0, curr_offset = 0; + FUNC_ENTER_NOAPI_NOINIT if (num_requests <= 0){ @@ -755,6 +778,7 @@ int H5VL_iod_server_compactor_write (void *_list, int num_requests) op_data = (op_data_t *)list[request_counter].op_data; input = (dset_io_in_t *)op_data->input; + coh = input->coh; iod_oh = input->iod_oh; iod_id = input->iod_id; bulk_handle = input->bulk_handle; @@ -785,8 +809,10 @@ int H5VL_iod_server_compactor_write (void *_list, int num_requests) op_data = (op_data_t *)list[request_counter].op_data; input = (dset_io_in_t *)op_data->input; - iod_oh = input->iod_oh; + coh = input->coh; + iod_oh = input->iod_oh; iod_id = input->iod_id; + space_id = list[request_counter].selection_id; dst_id = input->dset_type_id; @@ -846,8 +872,20 @@ int H5VL_iod_server_compactor_write (void *_list, int num_requests) } } #endif - + mem_desc = (iod_mem_desc_t *) malloc + (sizeof(iod_mem_desc_t) * (size_t)num_descriptors); + if (NULL == mem_desc){ + HGOTO_ERROR(H5E_SYM, H5E_NOSPACE, FAIL, "can't allocate iod array descriptors"); + } + + file_desc = (iod_array_iodesc_t *) malloc + (sizeof(iod_array_iodesc_t) * (size_t)num_descriptors); + if (NULL == file_desc){ + HGOTO_ERROR(H5E_SYM, H5E_NOSPACE, FAIL, "can't allocate iod array descriptors"); + } + curr_j = 0; + for(n=0 ; n<num_descriptors ; n++) { num_bytes = 0; @@ -863,17 +901,17 @@ int H5VL_iod_server_compactor_write (void *_list, int num_requests) buf_ptr = (uint8_t *)buf; /* set the memory descriptor */ - mem_desc = (iod_mem_desc_t *) malloc (sizeof (iod_mem_desc_t)); - mem_desc->frag = (iod_mem_frag_t *) malloc (sizeof (iod_mem_frag_t)); - mem_desc->nfrag = 1; - mem_desc->frag[0].addr = (void *)buf_ptr; - mem_desc->frag[0].len = (iod_size_t)num_bytes; + + mem_desc[n].frag = (iod_mem_frag_t *) malloc (sizeof (iod_mem_frag_t)); + mem_desc[n].nfrag = 1; + mem_desc[n].frag->addr = (void *)buf_ptr; + mem_desc[n].frag->len = (iod_size_t)num_bytes; buf_ptr += num_bytes; if (NULL != buf){ free(buf); } } - + if (list[request_counter].merged == MERGED){ #if DEBUG_COMPACTOR @@ -920,71 +958,81 @@ int H5VL_iod_server_compactor_write (void *_list, int num_requests) /*Determined the number of entries in the memory block for this hslab descriptor*/ - if (NULL == mem_desc){ - mem_desc = (iod_mem_desc_t *) malloc (sizeof(iod_mem_desc_t)); - mem_desc->nfrag = mem_reqs; - mem_desc->frag = + mem_desc[n].nfrag = mem_reqs; + mem_desc[n].frag = (iod_mem_frag_t *) malloc ((int)mem_reqs * sizeof(iod_mem_frag_t)); - } - k = 0; - + + k = 0; fprintf (stderr,"COMPACTOR WRITE k: %lli start_reqs: %lli, mem_reqs: %lli\n", k, start_reqs, mem_reqs); for ( j = start_reqs; j < (start_reqs + mem_reqs); j++){ if ((j == curr_j) && (bytes_left)){ - mem_desc->frag[k].addr = (void *)(curr_offset); - mem_desc->frag[k].len = bytes_left; + mem_desc[n].frag[k].addr = (void *)(curr_offset); + mem_desc[n].frag[k].len = bytes_left; } else{ - mem_desc->frag[k].addr = (void *) + mem_desc[n].frag[k].addr = (void *) (list[request_counter].mblocks[j].offset); - mem_desc->frag[k].len = list[request_counter].mblocks[j].len; + mem_desc[n].frag[k].len = list[request_counter].mblocks[j].len; } k++; } - curr_k = k; + curr_k += k; +#if DEBUG_COMPACTOR for ( j = 0 ; j < k; j++){ - - ptr = (int *)mem_desc->frag[j].addr; + ptr = (int *)(mem_desc[n].frag[j].addr); fprintf(stderr, "COMPACTOR MERGED WRITE IOD_BUFFER j: %lli, k: %lli len: %llu\n", - j,k, mem_desc->frag[j].len); - for (ii = 0; ii < mem_desc->frag[j].len/sizeof(int); ii++) + j,k, mem_desc[n].frag[j].len); + for (ii = 0; ii < mem_desc[n].frag[j].len/sizeof(int); ii++) fprintf(stderr, "%d ", ptr[ii]); fprintf(stderr, "\n"); - } fflush(stderr); +#endif } - file_desc = hslabs[n]; + file_desc[n] = hslabs[n]; #if H5VL_IOD_DEBUG for(i=0 ; i<ndims ; i++) { fprintf(stderr, "Dim %d: start %zu stride %zu block %zu count %zu\n", - i, (size_t)file_desc.start[i], (size_t)file_desc.stride[i], - (size_t)file_desc.block[i], (size_t)file_desc.count[i]); + i, (size_t)file_desc[n].start[i], (size_t)file_desc[n].stride[i], + (size_t)file_desc[n].block[i], (size_t)file_desc[n].count[i]); } -#endif - if(iod_array_write(iod_oh, IOD_TID_UNKNOWN, NULL, - mem_desc /*This is where the memory descriptor goes*/, - &file_desc, - NULL, NULL) < 0) - HGOTO_ERROR(H5E_DATASET, H5E_WRITEERROR, FAIL, "can't write to array object"); - - - if (NULL != mem_desc){ - iod_mem_frag_t *frag = mem_desc->frag; +#endif + } + + + larray->oh = iod_oh; + larray->hints = NULL; + larray->mem_desc = mem_desc; + larray->io_desc = file_desc; + larray->cs = NULL; + larray->ret = NULL; + + if(iod_array_write(iod_oh, IOD_TID_UNKNOWN, NULL, + mem_desc /*This is where the memory descriptor goes*/, + file_desc, + NULL, NULL) < 0) + HGOTO_ERROR(H5E_DATASET, H5E_WRITEERROR, FAIL, "can't write to array object"); + + if (NULL != mem_desc){ + for (n = 0; n < num_descriptors; n++){ + iod_mem_frag_t *frag = mem_desc[n].frag; if (NULL != frag){ free(frag); } - free(mem_desc); - mem_desc = NULL; } - + free(mem_desc); + mem_desc = NULL; } + if (NULL != file_desc){ + free(file_desc); + file_desc = NULL; + } if (NULL != hslabs){ @@ -996,7 +1044,6 @@ int H5VL_iod_server_compactor_write (void *_list, int num_requests) } free(hslabs); } - /* write from array object */ /* TODO: VV Change checksum once that is fixed*/ if(opened_locally) { @@ -1026,6 +1073,35 @@ int H5VL_iod_server_compactor_write (void *_list, int num_requests) FUNC_LEAVE_NOAPI(ret_value); }/*H5VL_iod_server_compactor_write*/ + + +int H5VL_iod_server_send_result (void *_list, + int num_requests){ + + + int i; + int ret_value = HG_SUCCESS; + op_data_t *op_data = NULL; + dset_io_in_t *input = NULL; + request_list_t *list = (request_list_t *)_list; + + FUNC_ENTER_NOAPI_NOINIT + + for (i = 0; i< num_requests; i++){ + if (list[i].merged != MERGED){ + op_data = (op_data_t *)list[i].op_data; + input = (dset_io_in_t *)op_data->input; + if(HG_SUCCESS != HG_Handler_start_output(op_data->hg_handle, &ret_value)) + HDONE_ERROR(H5E_SYM, H5E_WRITEERROR, HG_FAIL, "can't send result of write to client"); + input = (dset_io_in_t *)H5MM_xfree(input); + op_data = (op_data_t *)H5MM_xfree(op_data); + } + } + + done: + FUNC_LEAVE_NOAPI(ret_value); + +} /*------------------------------------------------------------------------- diff --git a/src/H5VLiod_server.h b/src/H5VLiod_server.h index 666bbb8..cb942ee 100644 --- a/src/H5VLiod_server.h +++ b/src/H5VLiod_server.h @@ -115,7 +115,12 @@ H5_DLL void H5VL_iod_server_dset_compactor_cb(AXE_engine_t axe_engine, size_t num_s_parents, AXE_task_t s_parents[], void *queue); -H5_DLL int H5VL_iod_server_compactor_write (void *list, int num_requests); +H5_DLL int H5VL_iod_server_compactor_write (void *list, int num_requests, + iod_array_io_t *array_write); + +/*H5_DLL int H5VL_iod_server_compactor_write (void *list, int num_requests); */ + +H5_DLL int H5VL_iod_server_send_result (void *list, int num_requests); H5_DLL int H5VL_iod_reconstruct_parents (AXE_engine_t axe_engine, axe_ids_t *old_parents, @@ -125,6 +130,7 @@ H5_DLL void H5VL_iod_server_file_create_cb(AXE_engine_t axe_engine, size_t num_n_parents, AXE_task_t n_parents[], size_t num_s_parents, AXE_task_t s_parents[], void *op_data); + H5_DLL void H5VL_iod_server_file_open_cb(AXE_engine_t axe_engine, size_t num_n_parents, AXE_task_t n_parents[], size_t num_s_parents, AXE_task_t s_parents[], |