summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVishwanath Venkatesan <vish@hdfgroup.org>2013-07-25 23:14:46 (GMT)
committerVishwanath Venkatesan <vish@hdfgroup.org>2013-07-25 23:14:46 (GMT)
commit0721ed12752da7dc22044236da176b5902c83e54 (patch)
treef81cc07d438f329332b6e25a7ad56387c54bd6a9
parent315e2305153d0b67dd78e119db694accfcce85a3 (diff)
downloadhdf5-0721ed12752da7dc22044236da176b5902c83e54.zip
hdf5-0721ed12752da7dc22044236da176b5902c83e54.tar.gz
hdf5-0721ed12752da7dc22044236da176b5902c83e54.tar.bz2
[svn-r23934] Merging multiple datasets for IOD. Logic in place -- implemented
-rw-r--r--src/H5VLiod_dset.c184
-rw-r--r--src/H5VLiod_server.h8
2 files changed, 137 insertions, 55 deletions
diff --git a/src/H5VLiod_dset.c b/src/H5VLiod_dset.c
index 2d88976..67f58e1 100644
--- a/src/H5VLiod_dset.c
+++ b/src/H5VLiod_dset.c
@@ -627,7 +627,7 @@ H5VL_iod_server_dset_read_cb(AXE_engine_t UNUSED axe_engine,
request_list_t *wlist=NULL;
dataset_container_t *dlist=NULL;
int nentries = 0, ndatasets = 0;
-
+ iod_array_io_t *array_write = NULL;
FUNC_ENTER_NOAPI_NOINIT
#if DEBUG_COMPACTOR
@@ -640,7 +640,7 @@ H5VL_iod_server_dset_read_cb(AXE_engine_t UNUSED axe_engine,
usleep(1000);
-
+
while(pthread_mutex_trylock(&lock) != 0);
compactor_queue_flag = 0;
curr_queue = NULL;
@@ -659,20 +659,41 @@ H5VL_iod_server_dset_read_cb(AXE_engine_t UNUSED axe_engine,
ret_value);
}
#endif
-
-#if 1
+
+ array_write = (iod_array_io_t *) malloc (ndatasets *
+ sizeof (iod_array_io_t));
+ if (NULL == array_write){
+ HGOTO_ERROR(H5E_SYM, H5E_NOSPACE, FAIL, "No space to allocate iod_array");
+ }
+
for ( i = 0; i < ndatasets; i ++){
H5VL_iod_compact_requests (wlist, &nentries,dlist[i].num_requests,
dlist[i].requests);
- }
-#endif
- if (CP_SUCCESS != H5VL_iod_server_compactor_write (wlist, nentries)){
+ if (CP_SUCCESS != H5VL_iod_server_compactor_write (wlist, nentries,
+ &array_write[i])){
#if DEBUG_COMPACTOR
- fprintf (stderr,"COMPACTOR CB: compactor write failed \n");
+ fprintf (stderr,"COMPACTOR CB: compactor write failed \n");
#endif
- HGOTO_ERROR(H5E_SYM, H5E_WRITEERROR, CP_FAIL, "Lower lever write failed\n");
- }
+ HGOTO_ERROR(H5E_SYM, H5E_WRITEERROR, CP_FAIL, "Lower lever write failed\n");
+ }
+ }
+ /*Call here iod_array_write_list
+ array_write array has been constructed.
+ The COH value can come from any of the list requests
+ Once that is done --> execute the func : H5VL_iod_server_send_result
+ This sends results to all the clients about their completion.
+ There is a flaw in the IOD construction of io_array_t datastructure.
+ They seem to have mem_desc and io_desc as pointers and there is no-way to specify
+ the number of descriptors!.. We need that to free the memory descriptors.
+ For now we can leave it at this!
+ */
+
+
+ if (NULL != array_write){
+ free(array_write);
+ array_write = NULL;
+ }
if (NULL != wlist){
free(wlist);
wlist = NULL;
@@ -705,7 +726,8 @@ H5VL_iod_server_dset_read_cb(AXE_engine_t UNUSED axe_engine,
*-------------------------------------------------------------------------
*/
-int H5VL_iod_server_compactor_write (void *_list, int num_requests)
+int H5VL_iod_server_compactor_write (void *_list, int num_requests,
+ iod_array_io_t *larray)
{
int ret_value = CP_SUCCESS;
@@ -726,11 +748,12 @@ int H5VL_iod_server_compactor_write (void *_list, int num_requests)
hbool_t opened_locally = FALSE;
iod_hyperslab_t *hslabs = NULL;
iod_mem_desc_t *mem_desc = NULL;
- iod_array_iodesc_t file_desc;
+ iod_array_iodesc_t *file_desc=NULL;
hsize_t num_bytes = 0, mem_reqs = 0, start_reqs = 0;
hsize_t num_elems = 0, k = 0, j=0, curr_k = 0;
hsize_t curr_j = 0, bytes_left = 0, curr_offset = 0;
+
FUNC_ENTER_NOAPI_NOINIT
if (num_requests <= 0){
@@ -755,6 +778,7 @@ int H5VL_iod_server_compactor_write (void *_list, int num_requests)
op_data = (op_data_t *)list[request_counter].op_data;
input = (dset_io_in_t *)op_data->input;
+ coh = input->coh;
iod_oh = input->iod_oh;
iod_id = input->iod_id;
bulk_handle = input->bulk_handle;
@@ -785,8 +809,10 @@ int H5VL_iod_server_compactor_write (void *_list, int num_requests)
op_data = (op_data_t *)list[request_counter].op_data;
input = (dset_io_in_t *)op_data->input;
- iod_oh = input->iod_oh;
+ coh = input->coh;
+ iod_oh = input->iod_oh;
iod_id = input->iod_id;
+
space_id = list[request_counter].selection_id;
dst_id = input->dset_type_id;
@@ -846,8 +872,20 @@ int H5VL_iod_server_compactor_write (void *_list, int num_requests)
}
}
#endif
-
+ mem_desc = (iod_mem_desc_t *) malloc
+ (sizeof(iod_mem_desc_t) * (size_t)num_descriptors);
+ if (NULL == mem_desc){
+ HGOTO_ERROR(H5E_SYM, H5E_NOSPACE, FAIL, "can't allocate iod array descriptors");
+ }
+
+ file_desc = (iod_array_iodesc_t *) malloc
+ (sizeof(iod_array_iodesc_t) * (size_t)num_descriptors);
+ if (NULL == file_desc){
+ HGOTO_ERROR(H5E_SYM, H5E_NOSPACE, FAIL, "can't allocate iod array descriptors");
+ }
+
curr_j = 0;
+
for(n=0 ; n<num_descriptors ; n++) {
num_bytes = 0;
@@ -863,17 +901,17 @@ int H5VL_iod_server_compactor_write (void *_list, int num_requests)
buf_ptr = (uint8_t *)buf;
/* set the memory descriptor */
- mem_desc = (iod_mem_desc_t *) malloc (sizeof (iod_mem_desc_t));
- mem_desc->frag = (iod_mem_frag_t *) malloc (sizeof (iod_mem_frag_t));
- mem_desc->nfrag = 1;
- mem_desc->frag[0].addr = (void *)buf_ptr;
- mem_desc->frag[0].len = (iod_size_t)num_bytes;
+
+ mem_desc[n].frag = (iod_mem_frag_t *) malloc (sizeof (iod_mem_frag_t));
+ mem_desc[n].nfrag = 1;
+ mem_desc[n].frag->addr = (void *)buf_ptr;
+ mem_desc[n].frag->len = (iod_size_t)num_bytes;
buf_ptr += num_bytes;
if (NULL != buf){
free(buf);
}
}
-
+
if (list[request_counter].merged == MERGED){
#if DEBUG_COMPACTOR
@@ -920,71 +958,81 @@ int H5VL_iod_server_compactor_write (void *_list, int num_requests)
/*Determined the number of entries in the memory block for this hslab
descriptor*/
- if (NULL == mem_desc){
- mem_desc = (iod_mem_desc_t *) malloc (sizeof(iod_mem_desc_t));
- mem_desc->nfrag = mem_reqs;
- mem_desc->frag =
+ mem_desc[n].nfrag = mem_reqs;
+ mem_desc[n].frag =
(iod_mem_frag_t *) malloc ((int)mem_reqs * sizeof(iod_mem_frag_t));
- }
- k = 0;
-
+
+ k = 0;
fprintf (stderr,"COMPACTOR WRITE k: %lli start_reqs: %lli, mem_reqs: %lli\n",
k, start_reqs, mem_reqs);
for ( j = start_reqs; j < (start_reqs + mem_reqs); j++){
if ((j == curr_j) && (bytes_left)){
- mem_desc->frag[k].addr = (void *)(curr_offset);
- mem_desc->frag[k].len = bytes_left;
+ mem_desc[n].frag[k].addr = (void *)(curr_offset);
+ mem_desc[n].frag[k].len = bytes_left;
}
else{
- mem_desc->frag[k].addr = (void *)
+ mem_desc[n].frag[k].addr = (void *)
(list[request_counter].mblocks[j].offset);
- mem_desc->frag[k].len = list[request_counter].mblocks[j].len;
+ mem_desc[n].frag[k].len = list[request_counter].mblocks[j].len;
}
k++;
}
- curr_k = k;
+ curr_k += k;
+#if DEBUG_COMPACTOR
for ( j = 0 ; j < k; j++){
-
- ptr = (int *)mem_desc->frag[j].addr;
+ ptr = (int *)(mem_desc[n].frag[j].addr);
fprintf(stderr, "COMPACTOR MERGED WRITE IOD_BUFFER j: %lli, k: %lli len: %llu\n",
- j,k, mem_desc->frag[j].len);
- for (ii = 0; ii < mem_desc->frag[j].len/sizeof(int); ii++)
+ j,k, mem_desc[n].frag[j].len);
+ for (ii = 0; ii < mem_desc[n].frag[j].len/sizeof(int); ii++)
fprintf(stderr, "%d ", ptr[ii]);
fprintf(stderr, "\n");
-
}
fflush(stderr);
+#endif
}
- file_desc = hslabs[n];
+ file_desc[n] = hslabs[n];
#if H5VL_IOD_DEBUG
for(i=0 ; i<ndims ; i++) {
fprintf(stderr, "Dim %d: start %zu stride %zu block %zu count %zu\n",
- i, (size_t)file_desc.start[i], (size_t)file_desc.stride[i],
- (size_t)file_desc.block[i], (size_t)file_desc.count[i]);
+ i, (size_t)file_desc[n].start[i], (size_t)file_desc[n].stride[i],
+ (size_t)file_desc[n].block[i], (size_t)file_desc[n].count[i]);
}
-#endif
- if(iod_array_write(iod_oh, IOD_TID_UNKNOWN, NULL,
- mem_desc /*This is where the memory descriptor goes*/,
- &file_desc,
- NULL, NULL) < 0)
- HGOTO_ERROR(H5E_DATASET, H5E_WRITEERROR, FAIL, "can't write to array object");
-
-
- if (NULL != mem_desc){
- iod_mem_frag_t *frag = mem_desc->frag;
+#endif
+ }
+
+
+ larray->oh = iod_oh;
+ larray->hints = NULL;
+ larray->mem_desc = mem_desc;
+ larray->io_desc = file_desc;
+ larray->cs = NULL;
+ larray->ret = NULL;
+
+ if(iod_array_write(iod_oh, IOD_TID_UNKNOWN, NULL,
+ mem_desc /*This is where the memory descriptor goes*/,
+ file_desc,
+ NULL, NULL) < 0)
+ HGOTO_ERROR(H5E_DATASET, H5E_WRITEERROR, FAIL, "can't write to array object");
+
+ if (NULL != mem_desc){
+ for (n = 0; n < num_descriptors; n++){
+ iod_mem_frag_t *frag = mem_desc[n].frag;
if (NULL != frag){
free(frag);
}
- free(mem_desc);
- mem_desc = NULL;
}
-
+ free(mem_desc);
+ mem_desc = NULL;
}
+ if (NULL != file_desc){
+ free(file_desc);
+ file_desc = NULL;
+ }
if (NULL != hslabs){
@@ -996,7 +1044,6 @@ int H5VL_iod_server_compactor_write (void *_list, int num_requests)
}
free(hslabs);
}
-
/* write from array object */
/* TODO: VV Change checksum once that is fixed*/
if(opened_locally) {
@@ -1026,6 +1073,35 @@ int H5VL_iod_server_compactor_write (void *_list, int num_requests)
FUNC_LEAVE_NOAPI(ret_value);
}/*H5VL_iod_server_compactor_write*/
+
+
+int H5VL_iod_server_send_result (void *_list,
+ int num_requests){
+
+
+ int i;
+ int ret_value = HG_SUCCESS;
+ op_data_t *op_data = NULL;
+ dset_io_in_t *input = NULL;
+ request_list_t *list = (request_list_t *)_list;
+
+ FUNC_ENTER_NOAPI_NOINIT
+
+ for (i = 0; i< num_requests; i++){
+ if (list[i].merged != MERGED){
+ op_data = (op_data_t *)list[i].op_data;
+ input = (dset_io_in_t *)op_data->input;
+ if(HG_SUCCESS != HG_Handler_start_output(op_data->hg_handle, &ret_value))
+ HDONE_ERROR(H5E_SYM, H5E_WRITEERROR, HG_FAIL, "can't send result of write to client");
+ input = (dset_io_in_t *)H5MM_xfree(input);
+ op_data = (op_data_t *)H5MM_xfree(op_data);
+ }
+ }
+
+ done:
+ FUNC_LEAVE_NOAPI(ret_value);
+
+}
/*-------------------------------------------------------------------------
diff --git a/src/H5VLiod_server.h b/src/H5VLiod_server.h
index 666bbb8..cb942ee 100644
--- a/src/H5VLiod_server.h
+++ b/src/H5VLiod_server.h
@@ -115,7 +115,12 @@ H5_DLL void H5VL_iod_server_dset_compactor_cb(AXE_engine_t axe_engine,
size_t num_s_parents, AXE_task_t s_parents[],
void *queue);
-H5_DLL int H5VL_iod_server_compactor_write (void *list, int num_requests);
+H5_DLL int H5VL_iod_server_compactor_write (void *list, int num_requests,
+ iod_array_io_t *array_write);
+
+/*H5_DLL int H5VL_iod_server_compactor_write (void *list, int num_requests); */
+
+H5_DLL int H5VL_iod_server_send_result (void *list, int num_requests);
H5_DLL int H5VL_iod_reconstruct_parents (AXE_engine_t axe_engine,
axe_ids_t *old_parents,
@@ -125,6 +130,7 @@ H5_DLL void H5VL_iod_server_file_create_cb(AXE_engine_t axe_engine,
size_t num_n_parents, AXE_task_t n_parents[],
size_t num_s_parents, AXE_task_t s_parents[],
void *op_data);
+
H5_DLL void H5VL_iod_server_file_open_cb(AXE_engine_t axe_engine,
size_t num_n_parents, AXE_task_t n_parents[],
size_t num_s_parents, AXE_task_t s_parents[],