diff options
author | Vishwanath Venkatesan <vish@hdfgroup.org> | 2013-07-25 19:22:22 (GMT) |
---|---|---|
committer | Vishwanath Venkatesan <vish@hdfgroup.org> | 2013-07-25 19:22:22 (GMT) |
commit | 315e2305153d0b67dd78e119db694accfcce85a3 (patch) | |
tree | 56b499c4288e7c5c35a22addd7b45e7ec2d636c0 | |
parent | 89f2fd84bd4474cdff15cbc7f0e5b7364b251bfe (diff) | |
download | hdf5-315e2305153d0b67dd78e119db694accfcce85a3.zip hdf5-315e2305153d0b67dd78e119db694accfcce85a3.tar.gz hdf5-315e2305153d0b67dd78e119db694accfcce85a3.tar.bz2 |
[svn-r23933] 1. Overlapping writes fixes for more complicated scenarios handled.
2. Tests for testing compactor!
-rw-r--r-- | src/H5VLiod_compactor.c | 273 | ||||
-rw-r--r-- | src/H5VLiod_dset.c | 89 | ||||
-rw-r--r-- | src/H5VLiod_server.c | 16 | ||||
-rw-r--r-- | test/compactor_write_tests/Makefile | 12 | ||||
-rw-r--r-- | test/compactor_write_tests/multiple-writes-test1.c | 170 | ||||
-rw-r--r-- | test/compactor_write_tests/multiple-writes-test2.c | 168 | ||||
-rw-r--r-- | test/compactor_write_tests/multiple-writes-test3.c | 254 | ||||
-rw-r--r-- | test/compactor_write_tests/multiple-writes-test4.c | 254 | ||||
-rw-r--r-- | test/compactor_write_tests/multiple-writes-test5.c | 254 | ||||
-rw-r--r-- | test/compactor_write_tests/parallel_write.c | 126 | ||||
-rw-r--r-- | test/compactor_write_tests/test_server.c | 48 |
11 files changed, 1490 insertions, 174 deletions
diff --git a/src/H5VLiod_compactor.c b/src/H5VLiod_compactor.c index 6ad92fe..997f054 100644 --- a/src/H5VLiod_compactor.c +++ b/src/H5VLiod_compactor.c @@ -79,10 +79,20 @@ static size_t H5VL_iod_copy_desc (block_container_t *sm_block, size_t start, size_t j); +static size_t H5VL_iod_copy_desc_reduced (block_container_t *sm_block, + block_container_t *tmp_block, + size_t start, size_t counter, + size_t j); + static void H5VL_print_block_container (block_container_t *cont, size_t num); + +static int H5VL_check_overlapped_offsets(hsize_t start_i, hsize_t start_j, + hsize_t end_i, hsize_t end_j); + + /*---------------------------------------------------------------------*/ @@ -475,9 +485,12 @@ int H5VL_iod_create_request_list (compactor *queue, request_list_t **list, fprintf(stderr,"%s:%d N-entires : %d\n", __FILE__,__LINE__, request_id); fprintf(stderr,"%s:%d Compactor Request List \n",__FILE__,__LINE__); - fprintf(stderr,"Compactor-TUPLE: id -- AXE-id -- dataset -- dataspace -- fileblocks -- memblocks -- num_peers\n"); + fprintf(stderr, + "Compactor-TUPLE: id -- AXE-id -- dataset -- dataspace \ + -- fileblocks -- memblocks -- num_peers\n"); for ( i = 0; i < request_id; i++){ - fprintf(stderr, "Compactor-Entry: %d -- %llu -- %d -- %d -- %zd -- %zd -- %d\n", + fprintf(stderr, + "Compactor-Entry: %d -- %llu -- %d -- %d -- %zd -- %zd -- %d\n", newlist[i].request_id, newlist[i].axe_id, newlist[i].dataset_id, @@ -746,6 +759,7 @@ int H5VL_iod_compact_requests (request_list_t *list, int *total_requests, fprintf (stderr,"%s:%d Merged with : %d\n", __FILE__, __LINE__, list[request_list[1]].selection_id); #endif + merge_flag = 0; m_elmnt_size = list[request_list[0]].elementsize; if ( 0 == H5VL_iod_select_overlap ( current_space, @@ -894,6 +908,7 @@ int H5VL_iod_compact_requests (request_list_t *list, int *total_requests, free(lselected_req); lselected_req = NULL; } +#if DEBUG_COMPACTOR for (i = 0; i < original_requests; i ++){ fprintf(stderr,"%s:%d, Request :%d --> %d\n", __FILE__, @@ -901,7 +916,7 @@ int H5VL_iod_compact_requests (request_list_t *list, int *total_requests, i, list[i].merged); } - +#endif *total_requests = original_requests; done: @@ -909,6 +924,25 @@ int H5VL_iod_compact_requests (request_list_t *list, int *total_requests, }/*end H5VL_iod_compact_requests*/ +static int H5VL_check_overlapped_offsets(hsize_t start_i, hsize_t start_j, + hsize_t end_i, hsize_t end_j){ + + if (start_i < start_j){ + if (start_j >= end_i){ + return 0; + } + return 1; + } + else if (start_i > start_j){ + if (start_i >= end_j){ + return 0; + } + return 1; + } + else + return 1; +} + /*------------------------------------------------------------------------- * Function: H5VL_iod_reconstruct_overlapped_request * @@ -937,26 +971,38 @@ int H5VL_iod_reconstruct_overlapped_request (block_container_t *sf_block, size_t rev_mblks, rev_fblks; block_container_t *rev_sm_block=NULL, *rev_sf_block=NULL; int ret_value = CP_SUCCESS; + hsize_t start_i , start_j, end_i, end_j; int changed_cnt = 0, ii; FUNC_ENTER_NOAPI(NULL) rev_mblks = 0; rev_fblks = 0; - rev_sm_block = (block_container_t *) malloc (4 * sizeof (block_container_t)); - if (NULL == rev_sm_block) - HGOTO_ERROR(H5E_HEAP, H5E_CANTALLOC, CP_FAIL,"Allocation error for rev_sm_block"); - rev_sf_block = (block_container_t *) malloc (4 * sizeof (block_container_t)); - if (NULL == rev_sf_block) - HGOTO_ERROR(H5E_HEAP, H5E_CANTALLOC, CP_FAIL,"Allocation error for rev_sf_block"); +#if DEBUG_COMPACTOR + fprintf (stderr, + "in %s:%d sf[%zd].offset : %lli, sf[%zd].len: %zd\n", + __FILE__, __LINE__,i,sf_block[i].offset,i,sf_block[i].len); + fprintf (stderr, + "in %s:%d sf[%zd].offset: %lli, sf[%zd].len : %zd\n", + __FILE__,__LINE__,j,sf_block[j].offset,j,sf_block[j].len); +#endif + + start_i = sf_block[i].offset; + start_j = sf_block[j].offset; + end_i = sf_block[i].offset + sf_block[i].len; + end_j = sf_block[j].offset + sf_block[j].len; + + if(H5VL_check_overlapped_offsets(start_i, start_j, + end_i, end_j)){ + + rev_sm_block = (block_container_t *) malloc (4 * sizeof (block_container_t)); + if (NULL == rev_sm_block) + HGOTO_ERROR(H5E_HEAP, H5E_CANTALLOC, CP_FAIL,"Allocation error for rev_sm_block"); + rev_sf_block = (block_container_t *) malloc (4 * sizeof (block_container_t)); + if (NULL == rev_sf_block) + HGOTO_ERROR(H5E_HEAP, H5E_CANTALLOC, CP_FAIL,"Allocation error for rev_sf_block"); - fprintf (stderr, "sf[%zd].offset : %lli, sf[%zd].len: %zd, sf[%zd].offset: %lli, sf[%zd].len : %zd\n", - i,sf_block[i].offset,i,sf_block[i].len,j,sf_block[j].offset,j,sf_block[j].len); - - if(sf_block[i].offset + sf_block[i].len > - sf_block[j].offset){ - if (sf_block[i].offset + sf_block[i].len <= sf_block[j].offset + sf_block[j].len ){ /*This means the latest write entry starts in-between @@ -976,11 +1022,9 @@ int H5VL_iod_reconstruct_overlapped_request (block_container_t *sf_block, changed_cnt++; } else{ - /* j's start offset is different than I but in total they - write the same or j write more.. so modify i's len.. - the offsets still remain untouched!*/ - rem_len = (size_t)(sf_block[i].offset + sf_block[i].len) - - sf_block[j].offset; + /*j's start offset is greater than + i's + */ + rem_len = (size_t)(sf_block[j].offset - sf_block[i].offset); rev_sf_block[rev_fblks].offset = sf_block[i].offset; rev_sm_block[rev_mblks].offset = sm_block[i].offset; rev_sf_block[rev_fblks].len = rem_len; @@ -1039,7 +1083,8 @@ int H5VL_iod_reconstruct_overlapped_request (block_container_t *sf_block, rev_sf_block[rev_fblks].offset = sf_block[j].offset + sf_block[j].len; rev_sm_block[rev_mblks].offset = - sm_block[i].offset + ((sf_block[j].offset +sf_block[j].len) - sf_block[i].offset); + sm_block[i].offset + + ((sf_block[j].offset +sf_block[j].len) - sf_block[i].offset); rev_sf_block[rev_fblks].len = rem_len; rev_sm_block[rev_mblks].len = rem_len; changed[changed_cnt] = i; @@ -1054,6 +1099,7 @@ int H5VL_iod_reconstruct_overlapped_request (block_container_t *sf_block, } else{ + rem_len = (sf_block[j].offset + sf_block[j].len) - sf_block[i].offset; rev_sf_block[rev_fblks].offset = @@ -1095,8 +1141,10 @@ int H5VL_iod_reconstruct_overlapped_request (block_container_t *sf_block, fprintf (stderr,"rev_fblks: %zd\n", rev_fblks); for (jj = 0; jj < rev_fblks; jj++){ - fprintf (stderr, "rev_blks[%zd]: %lli, rev_blks[%zd]: %zd\n", - jj, rev_sf_block[jj].offset, jj, rev_sf_block[jj].len); + fprintf (stderr, + "rev_blks[%zd]: %lli, rev_blks[%zd]: %zd\n", + jj, rev_sf_block[jj].offset, jj, + rev_sf_block[jj].len); } fflush(stderr); #endif @@ -1218,7 +1266,6 @@ int H5VL_iod_construct_merged_request (request_list_t *list, sm_block[blck_cnt].offset = list[request_list[i]].mblocks[j].offset; sm_block[blck_cnt].len = list[request_list[i]].mblocks[j].len; axe_id_list[blck_cnt] = list[request_list[i]].axe_id; - #if DEBUG_COMPACTOR fprintf(stderr, "in %s:%d foffset: %lli len: %zd, moffset: %lli, len: %zd\n", __FILE__, __LINE__, @@ -1233,10 +1280,11 @@ int H5VL_iod_construct_merged_request (request_list_t *list, assert(blck_cnt == fblks); loop_blks = fblks; + + /* =============================================================================================*/ /* If its the same client, we do allow overlaps*/ /* In that case its assumed that the requests flow in temporally So we can replace the memory buffer with the latest request */ - if (overlap_flag){ rev_mblks = 0; @@ -1268,22 +1316,25 @@ int H5VL_iod_construct_merged_request (request_list_t *list, ii, j, changed, &changed_cnt)){ - HGOTO_ERROR(H5E_SYM, H5E_CANTINIT, CP_FAIL, "overlapped request construct error\n"); + HGOTO_ERROR(H5E_SYM, H5E_CANTINIT, CP_FAIL, + "overlapped request construct error\n"); } if (changed_cnt == 0){ #if DEBUG_COMPACTOR fprintf(stderr, - "Continuing no change with that combination.. the offsets remain as follows: \n"); - H5VL_print_block_container (f_block, loop_blks); - H5VL_print_block_container (m_block, loop_blks); + "Continuing no change with that combination..\n"); + /* H5VL_print_block_container (f_block, loop_blks); + H5VL_print_block_container (m_block, loop_blks); */ #endif continue; } #if DEBUG_COMPACTOR fprintf (stderr, - "in %s:%d Completed reconstructing overlapped request changed: %zd fblks: %zd, mblks: %zd\n", + "in %s:%d Completed reconstructing request\n", + __FILE__, __LINE__); + fprintf(stderr,"in %s:%d changed: %zd fblks: %zd, mblks: %zd\n", __FILE__, __LINE__, changed_cnt ,rev_fblks, rev_mblks); for (k = 0; k < rev_fblks; k++){ fprintf (stderr, "%zd: OFFSET: %lli, LEN: %zd\n", @@ -1291,74 +1342,58 @@ int H5VL_iod_construct_merged_request (request_list_t *list, } #endif - tmp_block = (block_container_t *) malloc ((fblks+(rev_fblks - 2)) * sizeof(block_container_t)); + tmp_block = + (block_container_t *) + malloc ((fblks+(rev_fblks - 2)) * sizeof(block_container_t)); if (NULL == tmp_block){ - HGOTO_ERROR(H5E_HEAP, H5E_CANTALLOC, CP_FAIL,"Allocation error for tmp_block"); + HGOTO_ERROR(H5E_HEAP, H5E_CANTALLOC, CP_FAIL, + "Allocation error for tmp_block"); } - tmpf_block = (block_container_t *) malloc ((fblks+(rev_fblks - 2)) * sizeof(block_container_t)); + tmpf_block = + (block_container_t *) + malloc ((fblks+(rev_fblks - 2)) * sizeof(block_container_t)); if (NULL == tmpf_block){ - HGOTO_ERROR(H5E_HEAP, H5E_CANTALLOC, CP_FAIL,"Allocation error for tmpf_block"); + HGOTO_ERROR(H5E_HEAP, H5E_CANTALLOC, CP_FAIL, + "Allocation error for tmpf_block"); } new_blks = 0; switch(changed_cnt){ case 1: if (changed[0] == (int)ii){ -#if DEBUG_COMPACTOR - fprintf (stderr,"Enters CASE 1 with changed_cnt : %d\n", - changed_cnt); -#endif - H5VL_iod_copy_desc(f_block, tmpf_block, 0, ii); new_blks += H5VL_iod_copy_desc(m_block, tmp_block, 0,ii); - -#if DEBUG_COMPACTOR - fprintf (stderr,"***** new_blks :%zd\n", new_blks); H5VL_print_block_container (tmpf_block, new_blks); - H5VL_print_block_container (tmp_block, new_blks); -#endif tmpf_block[new_blks].offset = tf_block[0].offset; tmpf_block[new_blks].len = tf_block[0].len; tmp_block[new_blks].offset = tm_block[0].offset; tmp_block[new_blks].len = tm_block[0].len; new_blks++; - -#if DEBUG_COMPACTOR H5VL_print_block_container (tmpf_block, new_blks); - H5VL_print_block_container (tmp_block, new_blks); - - fprintf (stderr, "ii+1 : %zd, j: %zd\n", ii+1, j); -#endif new_blks += H5VL_iod_copy_desc(f_block, tmpf_block, ii+1, j); H5VL_iod_copy_desc(m_block, tmp_block, ii+1, j); - + H5VL_print_block_container (tmpf_block, new_blks); + H5VL_iod_copy_desc_reduced(f_block, tmpf_block, + j+1, new_blks, + loop_blks); + new_blks += H5VL_iod_copy_desc_reduced(m_block, + tmp_block, + j+1, new_blks, + loop_blks); loop_blks -= 1; - - H5VL_iod_copy_desc(f_block, tmpf_block, j+1, loop_blks); - H5VL_iod_copy_desc(m_block, tmp_block, j+1, loop_blks); -#if DEBUG_COMPACTOR - H5VL_print_block_container (tmpf_block, loop_blks); - H5VL_print_block_container (tmp_block, loop_blks); -#endif - ii -= 1; + H5VL_print_block_container (tmpf_block, new_blks); + fprintf (stderr, "new_blks: %zd, loop_blks: %zd\n", + new_blks, loop_blks); + if (!((int)(ii - 1) < 0)) + ii -= 1; } break; case 2: -#if DEBUG_COMPACTOR - fprintf (stderr,"Enters CASE 2 with changed_cnt : %d\n", - changed_cnt); -#endif - for (i = 0; i < 2; i++){ if (changed[i] == (int)ii){ new_blks += H5VL_iod_copy_desc(f_block, tmpf_block, 0, ii); H5VL_iod_copy_desc(m_block, tmp_block, 0, ii); -#if DEBUG_COMPACTOR - fprintf (stderr,"***** new_blks :%zd\n", new_blks); - H5VL_print_block_container (tmpf_block, new_blks); - H5VL_print_block_container (tmp_block, new_blks); -#endif tmpf_block[new_blks].offset = tf_block[i].offset; tmpf_block[new_blks].len = tf_block[i].len; tmp_block[new_blks].offset = tm_block[i].offset; @@ -1385,22 +1420,14 @@ int H5VL_iod_construct_merged_request (request_list_t *list, case 3: for (i = 0; i < 3; i ++){ if (changed[i] == (int)ii){ -#if DEBUG_COMPACTOR - fprintf(stderr,"in Changed CASE 3 cnt: %zd and changed[%d]: %d, loop_blck: %zd\n", - changed_cnt,i,changed[i], loop_blks ); - -#endif new_blks += H5VL_iod_copy_desc(f_block, tmpf_block, 0, ii); H5VL_iod_copy_desc(m_block, tmp_block, 0, ii); -#if DEBUG_COMPACTOR - H5VL_print_block_container (tmpf_block, new_blks); - H5VL_print_block_container (tmp_block, new_blks); -#endif tmpf_block[new_blks].offset = tf_block[i].offset; tmpf_block[new_blks].len = tf_block[i].len; tmp_block[new_blks].offset = tm_block[i].offset; tmp_block[new_blks].len = tm_block[i].len; + new_blks++; i++; loop_blks += 1; @@ -1409,19 +1436,8 @@ int H5VL_iod_construct_merged_request (request_list_t *list, tmp_block[new_blks].offset = tm_block[i].offset; tmp_block[new_blks].len = tm_block[i].len; new_blks++; -#if DEBUG_COMPACTOR - fprintf (stderr, "*********** loop_blks: %zd, newblks: %zd\n", - loop_blks, new_blks); - H5VL_print_block_container (tmpf_block, new_blks); - H5VL_print_block_container (tmp_block, new_blks); -#endif - } else{ -#if DEBUG_COMPACTOR - fprintf(stderr,"in %s:%d Changed CASE 3 with Changed_cnt: %zd and changed[%d]: %d\n", - __FILE__, __LINE__,changed_cnt,i,changed[i] ); -#endif if (ii+1 < j){ new_blks += H5VL_iod_copy_desc(f_block, tmpf_block, ii+1, j); H5VL_iod_copy_desc(m_block, tmp_block, ii+1, j); @@ -1431,23 +1447,13 @@ int H5VL_iod_construct_merged_request (request_list_t *list, tmp_block[new_blks].offset = tm_block[i].offset; tmp_block[new_blks].len = tm_block[i].len; new_blks++; -#if DEBUG_COMPACTOR - fprintf (stderr, "*********** loop_blks: %zd, newblks: %zd, j: %zd\n", - loop_blks, new_blks, j); - H5VL_print_block_container (tmpf_block, new_blks); - H5VL_print_block_container (tmp_block, new_blks); -#endif - if (new_blks < loop_blks){ - new_blks += H5VL_iod_copy_desc(f_block, tmpf_block, j+1, loop_blks); + new_blks += + H5VL_iod_copy_desc(f_block, tmpf_block, j+1, loop_blks); H5VL_iod_copy_desc(m_block, tmp_block, j+1, loop_blks); } } } -#if DEBUG_COMPACTOR - fprintf (stderr, "Total revised blocks : %zd \n", - new_blks); -#endif break; default: ret_value = CP_FAIL; @@ -1456,17 +1462,10 @@ int H5VL_iod_construct_merged_request (request_list_t *list, if (new_blks < loop_blks){ H5VL_iod_copy_desc(f_block, tmpf_block, new_blks, loop_blks); - new_blks += H5VL_iod_copy_desc(m_block, tmp_block, new_blks, loop_blks); + new_blks += + H5VL_iod_copy_desc(m_block, tmp_block, new_blks, loop_blks); } - if (NULL != m_block){ - free(m_block); - m_block = NULL; - } - if (NULL != f_block){ - free(f_block); - f_block = NULL; - } if (NULL != tf_block){ free(tf_block); tf_block = NULL; @@ -1475,14 +1474,27 @@ int H5VL_iod_construct_merged_request (request_list_t *list, free(tm_block); tm_block = NULL; } + + if (NULL != m_block){ + free(m_block); + m_block = NULL; + } + if (NULL != f_block){ + free(f_block); + f_block = NULL; + } f_block = tmpf_block; m_block = tmp_block; + } #if DEBUG_COMPACTOR - fprintf (stderr,"***********new_blks: %zd loop_blks: %zd**********\n", - new_blks, loop_blks); + fprintf (stderr, + "***********AFTER ITERATION i: %zd, j: %zd**************\n", + ii, j); H5VL_print_block_container (f_block, loop_blks); H5VL_print_block_container (m_block, loop_blks); + fprintf (stderr, + "########################################################\n"); #endif } @@ -1508,7 +1520,8 @@ int H5VL_iod_construct_merged_request (request_list_t *list, sorted = (int *) malloc (blck_cnt * sizeof(int)); if ( NULL == sorted ){ - HGOTO_ERROR(H5E_HEAP, H5E_CANTALLOC, CP_FAIL,"Allocation error for sorted array"); + HGOTO_ERROR(H5E_HEAP, H5E_CANTALLOC, CP_FAIL, + "Allocation error for sorted array"); } @@ -1516,11 +1529,12 @@ int H5VL_iod_construct_merged_request (request_list_t *list, #if DEBUG_COMPACTOR - fprintf (stderr, "********************* AFTER Adjusting for overlap **************\n"); - + fprintf (stderr, + "********************* AFTER Adjusting for overlap **************\n"); for ( j = 0; j < blck_cnt; j++){ - fprintf(stderr, "in %s:%d sorted_foffset: %lli sorted_len: %zd, sorted_moffset: %lli, sorted_mlen: %zd\n", - __FILE__, __LINE__, + fprintf(stderr, + "sorted_foffset: %lli sorted_len: %zd,\ + sorted_moffset: %lli, sorted_mlen: %zd\n", sf_block[sorted[j]].offset, sf_block[sorted[j]].len, sm_block[sorted[j]].offset, sm_block[sorted[j]].len); } @@ -1637,11 +1651,13 @@ static void H5VL_print_block_container (block_container_t *cont, size_t num){ size_t k; +#if DEBUG_COMPACTOR for (k = 0; k < num; k++){ fprintf (stderr, "%zd: block.offset: %lli, block.len: %zd \n", k, cont[k].offset, cont[k].len); } +#endif } @@ -1652,15 +1668,30 @@ static size_t H5VL_iod_copy_desc (block_container_t *sm_block, size_t j){ size_t i, cnt = 0; -#if DEBUG_COMPACTOR - fprintf (stderr,"start: %zd, end: %zd\n", - start, j); -#endif + for (i = start; i < j; i++){ tmp_block[i].offset = sm_block[i].offset; tmp_block[i].len = sm_block[i].len; cnt++; } + + return cnt; +} + +static size_t H5VL_iod_copy_desc_reduced (block_container_t *sm_block, + block_container_t *tmp_block, + size_t start, size_t counter, + size_t j){ + + size_t i, cnt = 0; + + for (i = start; i < j; i++){ + tmp_block[counter].offset = sm_block[i].offset; + tmp_block[counter].len = sm_block[i].len; + counter++; + cnt++; + } + return cnt; } diff --git a/src/H5VLiod_dset.c b/src/H5VLiod_dset.c index 2d67178..2d88976 100644 --- a/src/H5VLiod_dset.c +++ b/src/H5VLiod_dset.c @@ -768,6 +768,7 @@ int H5VL_iod_server_compactor_write (void *_list, int num_requests) fprintf (stderr,"COMPACTOR WRITE: Request %d has not been merged \n", request_counter+1); size = list[request_counter].mem_length; buf = (void *)list[request_counter].mem_buf; + #if DEBUG_COMPACTOR ptr = (int *) buf; fprintf(stderr,"COMPACTOR WRITE: Received a buffer of size %zd with values :", @@ -789,11 +790,12 @@ int H5VL_iod_server_compactor_write (void *_list, int num_requests) space_id = list[request_counter].selection_id; dst_id = input->dset_type_id; +#if DEBUG_COMPACTOR fprintf (stderr,"space_id: %d, selection_id: %d\n", space_id, list[request_counter].selection_id); - /*Even in the case its not merged the buffer was already extracted*/ fprintf (stderr,"COMPACTOR WRITE: Request %d has been merged \n", request_counter+1); +#endif } dst_size = H5Tget_size(dst_id); @@ -813,10 +815,11 @@ int H5VL_iod_server_compactor_write (void *_list, int num_requests) if(H5VL_iod_get_file_desc(space_id, &num_descriptors, NULL) < 0) HGOTO_ERROR(H5E_DATASPACE, H5E_CANTGET, FAIL, "unable to generate IOD file descriptor from dataspace selection"); +#if DEBUG_COMPACTOR fprintf (stderr,"COMPACTOR num_descriptors : %llu, ndims: %d\n", num_descriptors, ndims); - +#endif /* allocate the IOD hyperslab descriptors needed */ if(NULL == (hslabs = (iod_hyperslab_t *)malloc (sizeof(iod_hyperslab_t) * (size_t)num_descriptors))) @@ -849,7 +852,7 @@ int H5VL_iod_server_compactor_write (void *_list, int num_requests) num_bytes = 0; num_elems = 1; - + /* determine how many bytes the current descriptor holds */ for(i=0 ; i<ndims ; i++) num_elems *= (hslabs[n].count[i] * hslabs[n].block[i]); @@ -874,22 +877,22 @@ int H5VL_iod_server_compactor_write (void *_list, int num_requests) if (list[request_counter].merged == MERGED){ #if DEBUG_COMPACTOR - fprintf (stderr, "COMPACTOR WRITE i: %d, num_mblocks: %zd, num_bytes: %lli\n", - request_counter, list[request_counter].num_mblocks, num_bytes); - for ( j = 0; j < list[request_counter].num_mblocks; j++){ + fprintf (stderr, "COMPACTOR WRITE i: %d, num_mblocks: %zd, num_bytes: %lli, start_reqs: %lli\n", + request_counter, list[request_counter].num_mblocks, num_bytes, start_reqs); + for ( j = curr_j; j < list[request_counter].num_mblocks; j++){ fprintf (stderr,"COMPACTOR WRITE j: %lli, len: %zd\n", j, list[request_counter].mblocks[j].len); } #endif k = 0; - mem_reqs = 0; start_reqs = curr_j; + mem_reqs = 0; fprintf (stderr,"COMPACTOR, curr_j: %lli, j: %lli, num_mblocks: %zd\n", curr_j, j, list[request_counter].num_mblocks); + start_reqs = curr_j; for (j = curr_j; j < list[request_counter].num_mblocks; j++){ - - if (k < num_bytes){ + if (k < num_bytes){ if (bytes_left){ k += bytes_left; mem_reqs += 1; @@ -911,57 +914,49 @@ int H5VL_iod_server_compactor_write (void *_list, int num_requests) } } -#if 0 - fprintf (stderr, - "COMPACTOR WRITE k: %lli mem_reqs: %lli start_reqs: %lli, curr_j: %lli\n", - k, - mem_reqs, - start_reqs, - curr_j); -#endif + curr_j += 1; + fprintf (stderr,"COMPACTOR WRITE start_reqs: %lli, mem_reqs: %lli\n", + start_reqs, mem_reqs); /*Determined the number of entries in the memory block for this hslab descriptor*/ - mem_desc = (iod_mem_desc_t *) malloc (sizeof(iod_mem_desc_t)); - mem_desc->nfrag = mem_reqs; - mem_desc->frag = - (iod_mem_frag_t *) malloc ((int)mem_reqs * sizeof(iod_mem_frag_t)); - k = curr_k; + if (NULL == mem_desc){ + mem_desc = (iod_mem_desc_t *) malloc (sizeof(iod_mem_desc_t)); + mem_desc->nfrag = mem_reqs; + mem_desc->frag = + (iod_mem_frag_t *) malloc ((int)mem_reqs * sizeof(iod_mem_frag_t)); + } + k = 0; - fprintf (stderr,"COMPACTOR WRITE start_reqs: %lli, mem_reqs: %lli\n", - start_reqs, mem_reqs); - for ( j = start_reqs; j < mem_reqs; j++){ + fprintf (stderr,"COMPACTOR WRITE k: %lli start_reqs: %lli, mem_reqs: %lli\n", + k, start_reqs, mem_reqs); + + for ( j = start_reqs; j < (start_reqs + mem_reqs); j++){ if ((j == curr_j) && (bytes_left)){ - mem_desc->frag[k].addr = (void *)(uintptr_t)(curr_offset); + mem_desc->frag[k].addr = (void *)(curr_offset); mem_desc->frag[k].len = bytes_left; } else{ mem_desc->frag[k].addr = (void *) - (uintptr_t)(list[request_counter].mblocks[j].offset); + (list[request_counter].mblocks[j].offset); mem_desc->frag[k].len = list[request_counter].mblocks[j].len; -#if 0 - fprintf(stderr,"COMPACTOR %lli: off: %lli, off: %lli len: %llu\n", - k, - (hsize_t)mem_desc->frag[k].addr, - list[request_counter].mblocks[j].offset, - mem_desc->frag[k].len); -#endif } k++; } curr_k = k; -#if DEBUG_COMPACTOR - for ( j = 0; j < k; j++){ + + for ( j = 0 ; j < k; j++){ + ptr = (int *)mem_desc->frag[j].addr; fprintf(stderr, "COMPACTOR MERGED WRITE IOD_BUFFER j: %lli, k: %lli len: %llu\n", j,k, mem_desc->frag[j].len); for (ii = 0; ii < mem_desc->frag[j].len/sizeof(int); ii++) fprintf(stderr, "%d ", ptr[ii]); fprintf(stderr, "\n"); + } fflush(stderr); -#endif } file_desc = hslabs[n]; @@ -972,29 +967,34 @@ int H5VL_iod_server_compactor_write (void *_list, int num_requests) (size_t)file_desc.block[i], (size_t)file_desc.count[i]); } #endif - if(iod_array_write(iod_oh, IOD_TID_UNKNOWN, NULL, mem_desc /*This is where the memory descriptor goes*/, &file_desc, - NULL, NULL) < 0) + NULL, NULL) < 0) HGOTO_ERROR(H5E_DATASET, H5E_WRITEERROR, FAIL, "can't write to array object"); + if (NULL != mem_desc){ - if (NULL != mem_desc->frag){ - free(mem_desc->frag); + iod_mem_frag_t *frag = mem_desc->frag; + if (NULL != frag){ + free(frag); } free(mem_desc); mem_desc = NULL; } + + } + + + + if (NULL != hslabs){ for(n=0 ; n<num_descriptors ; n++) { free(hslabs[n].start); free(hslabs[n].stride); free(hslabs[n].block); free(hslabs[n].count); } - if(hslabs) - free(hslabs); - + free(hslabs); } /* write from array object */ @@ -1008,7 +1008,6 @@ int H5VL_iod_server_compactor_write (void *_list, int num_requests) #if H5VL_IOD_DEBUG fprintf(stderr, "Done with dset write, sending %d response to client\n", ret_value); #endif - } diff --git a/src/H5VLiod_server.c b/src/H5VLiod_server.c index 05959f3..df41e8b 100644 --- a/src/H5VLiod_server.c +++ b/src/H5VLiod_server.c @@ -1396,10 +1396,11 @@ int H5VL_iod_server_dset_compactor(op_data_t *op_data, int request_type) FUNC_ENTER_NOAPI_NOINIT #if DEBUG_COMPACTOR - fprintf (stderr, "id: %d, Enters the the dset_compactor, compactor_flag :%d\n", + fprintf(stderr, "id: %d, Enters the the dset_compactor, compactor_flag :%d\n", request_id, compactor_queue_flag); fflush(stderr); +#endif if (NULL == curr_queue){ if (compactor_queue_flag){ @@ -1410,13 +1411,12 @@ int H5VL_iod_server_dset_compactor(op_data_t *op_data, int request_type) fprintf(stderr,"Compactor Not present with flag : %d\n", compactor_queue_flag); } - else - fprintf(stderr,"Queue exists with compactor_flag : %d, and %d reqs\n", - compactor_queue_flag, - H5VL_iod_get_number_of_requests(curr_queue)); - fflush(stderr); - -#endif + else{ + fprintf(stderr,"Queue exists with compactor_flag : %d, and %d reqs\n", compactor_queue_flag, + H5VL_iod_get_number_of_requests(curr_queue)); + fflush(stderr); + } + input = (dset_io_in_t *)op_data->input; diff --git a/test/compactor_write_tests/Makefile b/test/compactor_write_tests/Makefile new file mode 100644 index 0000000..2286f6a --- /dev/null +++ b/test/compactor_write_tests/Makefile @@ -0,0 +1,12 @@ +all: + h5pcc -o multiple-writes-test1 multiple-writes-test1.c + h5pcc -o multiple-writes-test2 multiple-writes-test2.c + h5pcc -o multiple-writes-test3 multiple-writes-test3.c + h5pcc -o multiple-writes-test4 multiple-writes-test4.c + h5pcc -o multiple-writes-test5 multiple-writes-test5.c + h5pcc -o parallel_write parallel_write.c + h5pcc -o server test_server.c + +clean: + rm -rf multiple-writes-test1 multiple-writes-test2 multiple-writes-test3 multiple-writes-test4 multiple-writes-test5 server *.o *~ + rm -rf parallel_write *.cfg diff --git a/test/compactor_write_tests/multiple-writes-test1.c b/test/compactor_write_tests/multiple-writes-test1.c new file mode 100644 index 0000000..3648bd7 --- /dev/null +++ b/test/compactor_write_tests/multiple-writes-test1.c @@ -0,0 +1,170 @@ +#include <stdio.h> +#include <stdlib.h> +#include <string.h> +#include <assert.h> +#include "mpi.h" +#include "hdf5.h" + +#define NX 16 /* dataset dimensions */ +#define NY 8 +#define RANK 2 + +/*Entire write gets replaced +write buffer values start with 10's and increases and writes increase*/ + + +int main (int argc, char **argv){ + + const char filename[] = "eff_file.h5"; + hid_t file_id; + hid_t dataspaceID, dataspace2; + hid_t dataspace3, dataspace4; + hid_t dataspace5, dataspace6; + hid_t dset_id; + hid_t fapl_id, dxpl_id; + + const unsigned int nelem = 60; + int *data = NULL, *data1 = NULL; + unsigned int i = 0; + hsize_t dimsf[2]; + hsize_t count[2], offset[2]; + hsize_t s[2], b[2]; + int my_rank, my_size, ret; + int provided; + hid_t event_q, int_id; + int num_requests = 0; + H5_status_t *status = NULL; + + MPI_Init_thread(&argc, &argv, MPI_THREAD_MULTIPLE, &provided); + if(MPI_THREAD_MULTIPLE != provided) { + fprintf(stderr, "MPI does not have MPI_THREAD_MULTIPLE support\n"); + exit(1); + } + + + EFF_init (MPI_COMM_WORLD, MPI_INFO_NULL); + + MPI_Comm_rank (MPI_COMM_WORLD, &my_rank); + MPI_Comm_size (MPI_COMM_WORLD, &my_size); + + + if (my_size > 1){ + fprintf(stderr, "APP processes = %d cannot be greater than 1 \n", my_size); + MPI_Finalize(); + } + + + fapl_id = H5Pcreate (H5P_FILE_ACCESS); + H5Pset_fapl_iod(fapl_id, MPI_COMM_WORLD, MPI_INFO_NULL); + + event_q = H5EQcreate(fapl_id); + assert(event_q); + + file_id = H5Fcreate_ff(filename, H5F_ACC_TRUNC, H5P_DEFAULT, fapl_id, event_q); + assert(file_id); + + + dimsf[0] = NX; + dimsf[1] = NY; + + + dataspaceID = H5Screate_simple(RANK, dimsf, NULL); + dataspace4 = H5Screate_simple(RANK, dimsf, NULL); + + dset_id = H5Dcreate_ff(file_id, "D1" , H5T_NATIVE_INT, dataspaceID, + H5P_DEFAULT, H5P_DEFAULT, H5P_DEFAULT, 0, event_q); + assert(dset_id); + + count[0] = 8; + count[1] = 8; + offset[0] = 0; + offset[1] = 0; + + fprintf (stderr, "%d: count[0]:%lli, count[1]:%lli, offset[0]: %lli, offset[1]: %lli\n", + my_rank, + count[0], count[1], offset[0], offset[1]); + + dataspace2 = H5Screate_simple(RANK, count, NULL); + + H5Sselect_hyperslab(dataspaceID, + H5S_SELECT_SET, + offset, + NULL, + count, + NULL); + + data = (int *) malloc(sizeof(int)*count[0]*count[1]); + for (i=0; i < count[0]*count[1]; i++) { + data[i] = my_rank + 10; + } + + ret = H5Dwrite_ff(dset_id, + H5T_NATIVE_INT, + dataspace2, + dataspaceID, + H5P_DEFAULT, + data, + 0, + event_q); + assert(ret == 0); + + count[0] = 8; + count[1] = 8; + offset[0] = 0; + offset[1] = 0; + + + fprintf (stderr, "%d: count[0]:%lli, count[1]:%lli, offset[0]: %lli, offset[1]: %lli\n", + my_rank, + count[0], count[1], offset[0], offset[1]); + + dataspace3 = H5Screate_simple(RANK, count, NULL); + + + H5Sselect_hyperslab(dataspace4, + H5S_SELECT_SET, + offset, + NULL, + count, + NULL); + + + data1 = (int *) malloc(sizeof(int)*count[0]*count[1]); + for (i=0; i < count[0]*count[1]; i++) { + data1[i] = my_rank + 12; + + } + + + ret = H5Dwrite_ff(dset_id, + H5T_NATIVE_INT, + dataspace3, + dataspace4, + H5P_DEFAULT, + data1, + 0, + event_q); + assert(ret == 0); + + /* assert(H5Tclose_ff(int_id, event_q) == 0);*/ + assert(H5Dclose_ff(dset_id, event_q) == 0); + H5Sclose(dataspaceID); + H5Sclose(dataspace2); + H5Sclose(dataspace3); + H5Sclose(dataspace4); + H5Pclose(fapl_id); + assert(H5Fclose_ff(file_id, event_q) == 0); + H5EQwait(event_q, &num_requests, &status); + free(status); + H5EQclose (event_q); + + free(data); + free(data1); + fprintf(stderr, "\n*****************************************************************************************************************\n"); + fprintf(stderr, "Finalize EFF stack\n"); + fprintf(stderr, "*****************************************************************************************************************\n"); + EFF_finalize(); + MPI_Finalize(); + + return 0; +} diff --git a/test/compactor_write_tests/multiple-writes-test2.c b/test/compactor_write_tests/multiple-writes-test2.c new file mode 100644 index 0000000..d18795e --- /dev/null +++ b/test/compactor_write_tests/multiple-writes-test2.c @@ -0,0 +1,168 @@ +#include <stdio.h> +#include <stdlib.h> +#include <string.h> +#include <assert.h> +#include "mpi.h" +#include "hdf5.h" + +#define NX 16 /* dataset dimensions */ +#define NY 8 +#define RANK 2 + +/*Second write -- writes more than first write but overlaps*/ + +int main (int argc, char **argv){ + + const char filename[] = "eff_file.h5"; + hid_t file_id; + hid_t dataspaceID, dataspace2; + hid_t dataspace3, dataspace4; + hid_t dataspace5, dataspace6; + hid_t dset_id; + hid_t fapl_id, dxpl_id; + + const unsigned int nelem = 60; + int *data = NULL, *data1 = NULL; + unsigned int i = 0; + hsize_t dimsf[2]; + hsize_t count[2], offset[2]; + hsize_t s[2], b[2]; + int my_rank, my_size, ret; + int provided; + hid_t event_q, int_id; + int num_requests = 0; + H5_status_t *status = NULL; + + MPI_Init_thread(&argc, &argv, MPI_THREAD_MULTIPLE, &provided); + if(MPI_THREAD_MULTIPLE != provided) { + fprintf(stderr, "MPI does not have MPI_THREAD_MULTIPLE support\n"); + exit(1); + } + + + EFF_init (MPI_COMM_WORLD, MPI_INFO_NULL); + + MPI_Comm_rank (MPI_COMM_WORLD, &my_rank); + MPI_Comm_size (MPI_COMM_WORLD, &my_size); + + + if (my_size > 1){ + fprintf(stderr, "APP processes = %d cannot be greater than 1 \n", my_size); + MPI_Finalize(); + } + + + fapl_id = H5Pcreate (H5P_FILE_ACCESS); + H5Pset_fapl_iod(fapl_id, MPI_COMM_WORLD, MPI_INFO_NULL); + + event_q = H5EQcreate(fapl_id); + assert(event_q); + + file_id = H5Fcreate_ff(filename, H5F_ACC_TRUNC, H5P_DEFAULT, fapl_id, event_q); + assert(file_id); + + + dimsf[0] = NX; + dimsf[1] = NY; + + + dataspaceID = H5Screate_simple(RANK, dimsf, NULL); + dataspace4 = H5Screate_simple(RANK, dimsf, NULL); + + dset_id = H5Dcreate_ff(file_id, "D1" , H5T_NATIVE_INT, dataspaceID, + H5P_DEFAULT, H5P_DEFAULT, H5P_DEFAULT, 0, event_q); + assert(dset_id); + + count[0] = 4; + count[1] = 8; + offset[0] = 0; + offset[1] = 0; + + fprintf (stderr, "%d: count[0]:%lli, count[1]:%lli, offset[0]: %lli, offset[1]: %lli\n", + my_rank, + count[0], count[1], offset[0], offset[1]); + + dataspace2 = H5Screate_simple(RANK, count, NULL); + + H5Sselect_hyperslab(dataspaceID, + H5S_SELECT_SET, + offset, + NULL, + count, + NULL); + + data = (int *) malloc(sizeof(int)*count[0]*count[1]); + for (i=0; i < count[0]*count[1]; i++) { + data[i] = my_rank + 10; + } + + ret = H5Dwrite_ff(dset_id, + H5T_NATIVE_INT, + dataspace2, + dataspaceID, + H5P_DEFAULT, + data, + 0, + event_q); + assert(ret == 0); + + count[0] = 8; + count[1] = 8; + offset[0] = 2; + offset[1] = 0; + + + fprintf (stderr, "%d: count[0]:%lli, count[1]:%lli, offset[0]: %lli, offset[1]: %lli\n", + my_rank, + count[0], count[1], offset[0], offset[1]); + + dataspace3 = H5Screate_simple(RANK, count, NULL); + + + H5Sselect_hyperslab(dataspace4, + H5S_SELECT_SET, + offset, + NULL, + count, + NULL); + + + data1 = (int *) malloc(sizeof(int)*count[0]*count[1]); + for (i=0; i < count[0]*count[1]; i++) { + data1[i] = my_rank + 12; + + } + + + ret = H5Dwrite_ff(dset_id, + H5T_NATIVE_INT, + dataspace3, + dataspace4, + H5P_DEFAULT, + data1, + 0, + event_q); + assert(ret == 0); + + /* assert(H5Tclose_ff(int_id, event_q) == 0);*/ + assert(H5Dclose_ff(dset_id, event_q) == 0); + H5Sclose(dataspaceID); + H5Sclose(dataspace2); + H5Sclose(dataspace3); + H5Sclose(dataspace4); + H5Pclose(fapl_id); + assert(H5Fclose_ff(file_id, event_q) == 0); + H5EQwait(event_q, &num_requests, &status); + free(status); + H5EQclose (event_q); + + free(data); + free(data1); + fprintf(stderr, "\n*****************************************************************************************************************\n"); + fprintf(stderr, "Finalize EFF stack\n"); + fprintf(stderr, "*****************************************************************************************************************\n"); + EFF_finalize(); + MPI_Finalize(); + + return 0; +} diff --git a/test/compactor_write_tests/multiple-writes-test3.c b/test/compactor_write_tests/multiple-writes-test3.c new file mode 100644 index 0000000..9fa7170 --- /dev/null +++ b/test/compactor_write_tests/multiple-writes-test3.c @@ -0,0 +1,254 @@ +#include <stdio.h> +#include <stdlib.h> +#include <string.h> +#include <assert.h> +#include "mpi.h" +#include "hdf5.h" + +#define NX 16 /* dataset dimensions */ +#define NY 8 +#define RANK 2 + + + +int main (int argc, char **argv){ + + const char filename[] = "eff_file.h5"; + hid_t file_id; + hid_t dataspaceID, dataspace2; + hid_t dataspace3, dataspace4; + hid_t dataspace5, dataspace6; + hid_t dataspace7, dataspace8; + hid_t dset_id; + hid_t fapl_id, dxpl_id; + + const unsigned int nelem = 60; + int *data = NULL, *data1 = NULL; + int *data2 = NULL, *data3 = NULL; + unsigned int i = 0; + hsize_t dimsf[2]; + hsize_t count[2], offset[2]; + hsize_t s[2], b[2]; + int my_rank, my_size, ret; + int provided; + hid_t event_q, int_id; + int num_requests = 0; + H5_status_t *status = NULL; + + MPI_Init_thread(&argc, &argv, MPI_THREAD_MULTIPLE, &provided); + if(MPI_THREAD_MULTIPLE != provided) { + fprintf(stderr, "MPI does not have MPI_THREAD_MULTIPLE support\n"); + exit(1); + } + + + EFF_init (MPI_COMM_WORLD, MPI_INFO_NULL); + + MPI_Comm_rank (MPI_COMM_WORLD, &my_rank); + MPI_Comm_size (MPI_COMM_WORLD, &my_size); + + + if (my_size > 1){ + fprintf(stderr, "APP processes = %d cannot be greater than 1 \n", my_size); + MPI_Finalize(); + } + + + fapl_id = H5Pcreate (H5P_FILE_ACCESS); + H5Pset_fapl_iod(fapl_id, MPI_COMM_WORLD, MPI_INFO_NULL); + + event_q = H5EQcreate(fapl_id); + assert(event_q); + + file_id = H5Fcreate_ff(filename, H5F_ACC_TRUNC, H5P_DEFAULT, fapl_id, event_q); + assert(file_id); + + + dimsf[0] = NX; + dimsf[1] = NY; + + + dataspaceID = H5Screate_simple(RANK, dimsf, NULL); + dataspace4 = H5Screate_simple(RANK, dimsf, NULL); + dataspace5 = H5Screate_simple(RANK, dimsf, NULL); + dataspace7 = H5Screate_simple(RANK, dimsf, NULL); + + dset_id = H5Dcreate_ff(file_id, "D1" , H5T_NATIVE_INT, dataspaceID, + H5P_DEFAULT, H5P_DEFAULT, H5P_DEFAULT, 0, event_q); + assert(dset_id); + + count[0] = 4; + count[1] = 8; + offset[0] = 0; + offset[1] = 0; + + fprintf (stderr, "%d: count[0]:%lli, count[1]:%lli, offset[0]: %lli, offset[1]: %lli\n", + my_rank, + count[0], count[1], offset[0], offset[1]); + + dataspace2 = H5Screate_simple(RANK, count, NULL); + + H5Sselect_hyperslab(dataspaceID, + H5S_SELECT_SET, + offset, + NULL, + count, + NULL); + + data = (int *) malloc(sizeof(int)*count[0]*count[1]); + for (i=0; i < count[0]*count[1]; i++) { + data[i] = my_rank + 10; + } + + ret = H5Dwrite_ff(dset_id, + H5T_NATIVE_INT, + dataspace2, + dataspaceID, + H5P_DEFAULT, + data, + 0, + event_q); + assert(ret == 0); + + count[0] = 2; + count[1] = 8; + offset[0] = 2; + offset[1] = 0; + + + fprintf (stderr, "%d: count[0]:%lli, count[1]:%lli, offset[0]: %lli, offset[1]: %lli\n", + my_rank, + count[0], count[1], offset[0], offset[1]); + + dataspace3 = H5Screate_simple(RANK, count, NULL); + + + H5Sselect_hyperslab(dataspace4, + H5S_SELECT_SET, + offset, + NULL, + count, + NULL); + + + data1 = (int *) malloc(sizeof(int)*count[0]*count[1]); + for (i=0; i < count[0]*count[1]; i++) { + data1[i] = my_rank + 12; + } + + + ret = H5Dwrite_ff(dset_id, + H5T_NATIVE_INT, + dataspace3, + dataspace4, + H5P_DEFAULT, + data1, + 0, + event_q); + assert(ret == 0); + + + count[0] = 2; + count[1] = 8; + offset[0] = 6; + offset[1] = 0; + + + data2 = (int *) malloc(sizeof(int)*count[0]*count[1]); + for (i=0; i < count[0]*count[1]; i++) { + data2[i] = my_rank + 13; + } + + + + fprintf (stderr, "%d: count[0]:%lli, count[1]:%lli, offset[0]: %lli, offset[1]: %lli\n", + my_rank, + count[0], count[1], offset[0], offset[1]); + + dataspace6 = H5Screate_simple(RANK, count, NULL); + + H5Sselect_hyperslab(dataspace5, + H5S_SELECT_SET, + offset, + NULL, + count, + NULL); + + + + ret = H5Dwrite_ff(dset_id, + H5T_NATIVE_INT, + dataspace6, + dataspace5, + H5P_DEFAULT, + data2, + 0, + event_q); + assert(ret == 0); + + + count[0] = 4; + count[1] = 8; + offset[0] = 3; + offset[1] = 0; + + + data3 = (int *) malloc(sizeof(int)*count[0]*count[1]); + for (i=0; i < count[0]*count[1]; i++) { + data3[i] = my_rank + 14; + } + + + + fprintf (stderr, "%d: count[0]:%lli, count[1]:%lli, offset[0]: %lli, offset[1]: %lli\n", + my_rank, + count[0], count[1], offset[0], offset[1]); + + dataspace8 = H5Screate_simple(RANK, count, NULL); + + H5Sselect_hyperslab(dataspace7, + H5S_SELECT_SET, + offset, + NULL, + count, + NULL); + + + + ret = H5Dwrite_ff(dset_id, + H5T_NATIVE_INT, + dataspace8, + dataspace7, + H5P_DEFAULT, + data3, + 0, + event_q); + assert(ret == 0); + + + + + + assert(H5Dclose_ff(dset_id, event_q) == 0); + H5Sclose(dataspaceID); + H5Sclose(dataspace2); + H5Sclose(dataspace3); + H5Sclose(dataspace4); + H5Sclose(dataspace5); + H5Sclose(dataspace6); + H5Pclose(fapl_id); + assert(H5Fclose_ff(file_id, event_q) == 0); + H5EQwait(event_q, &num_requests, &status); + free(status); + H5EQclose (event_q); + + free(data); + free(data1); + fprintf(stderr, "\n*****************************************************************************************************************\n"); + fprintf(stderr, "Finalize EFF stack\n"); + fprintf(stderr, "*****************************************************************************************************************\n"); + EFF_finalize(); + MPI_Finalize(); + + return 0; +} diff --git a/test/compactor_write_tests/multiple-writes-test4.c b/test/compactor_write_tests/multiple-writes-test4.c new file mode 100644 index 0000000..0f32697 --- /dev/null +++ b/test/compactor_write_tests/multiple-writes-test4.c @@ -0,0 +1,254 @@ +#include <stdio.h> +#include <stdlib.h> +#include <string.h> +#include <assert.h> +#include "mpi.h" +#include "hdf5.h" + +#define NX 16 /* dataset dimensions */ +#define NY 8 +#define RANK 2 + + + +int main (int argc, char **argv){ + + const char filename[] = "eff_file.h5"; + hid_t file_id; + hid_t dataspaceID, dataspace2; + hid_t dataspace3, dataspace4; + hid_t dataspace5, dataspace6; + hid_t dataspace7, dataspace8; + hid_t dset_id; + hid_t fapl_id, dxpl_id; + + const unsigned int nelem = 60; + int *data = NULL, *data1 = NULL; + int *data2 = NULL, *data3 = NULL; + unsigned int i = 0; + hsize_t dimsf[2]; + hsize_t count[2], offset[2]; + hsize_t s[2], b[2]; + int my_rank, my_size, ret; + int provided; + hid_t event_q, int_id; + int num_requests = 0; + H5_status_t *status = NULL; + + MPI_Init_thread(&argc, &argv, MPI_THREAD_MULTIPLE, &provided); + if(MPI_THREAD_MULTIPLE != provided) { + fprintf(stderr, "MPI does not have MPI_THREAD_MULTIPLE support\n"); + exit(1); + } + + + EFF_init (MPI_COMM_WORLD, MPI_INFO_NULL); + + MPI_Comm_rank (MPI_COMM_WORLD, &my_rank); + MPI_Comm_size (MPI_COMM_WORLD, &my_size); + + + if (my_size > 1){ + fprintf(stderr, "APP processes = %d cannot be greater than 1 \n", my_size); + MPI_Finalize(); + } + + + fapl_id = H5Pcreate (H5P_FILE_ACCESS); + H5Pset_fapl_iod(fapl_id, MPI_COMM_WORLD, MPI_INFO_NULL); + + event_q = H5EQcreate(fapl_id); + assert(event_q); + + file_id = H5Fcreate_ff(filename, H5F_ACC_TRUNC, H5P_DEFAULT, fapl_id, event_q); + assert(file_id); + + + dimsf[0] = NX; + dimsf[1] = NY; + + + dataspaceID = H5Screate_simple(RANK, dimsf, NULL); + dataspace4 = H5Screate_simple(RANK, dimsf, NULL); + dataspace5 = H5Screate_simple(RANK, dimsf, NULL); + dataspace7 = H5Screate_simple(RANK, dimsf, NULL); + + dset_id = H5Dcreate_ff(file_id, "D1" , H5T_NATIVE_INT, dataspaceID, + H5P_DEFAULT, H5P_DEFAULT, H5P_DEFAULT, 0, event_q); + assert(dset_id); + + count[0] = 4; + count[1] = 8; + offset[0] = 0; + offset[1] = 0; + + fprintf (stderr, "%d: count[0]:%lli, count[1]:%lli, offset[0]: %lli, offset[1]: %lli\n", + my_rank, + count[0], count[1], offset[0], offset[1]); + + dataspace2 = H5Screate_simple(RANK, count, NULL); + + H5Sselect_hyperslab(dataspaceID, + H5S_SELECT_SET, + offset, + NULL, + count, + NULL); + + data = (int *) malloc(sizeof(int)*count[0]*count[1]); + for (i=0; i < count[0]*count[1]; i++) { + data[i] = my_rank + 10; + } + + ret = H5Dwrite_ff(dset_id, + H5T_NATIVE_INT, + dataspace2, + dataspaceID, + H5P_DEFAULT, + data, + 0, + event_q); + assert(ret == 0); + + count[0] = 2; + count[1] = 8; + offset[0] = 2; + offset[1] = 0; + + + fprintf (stderr, "%d: count[0]:%lli, count[1]:%lli, offset[0]: %lli, offset[1]: %lli\n", + my_rank, + count[0], count[1], offset[0], offset[1]); + + dataspace3 = H5Screate_simple(RANK, count, NULL); + + + H5Sselect_hyperslab(dataspace4, + H5S_SELECT_SET, + offset, + NULL, + count, + NULL); + + + data1 = (int *) malloc(sizeof(int)*count[0]*count[1]); + for (i=0; i < count[0]*count[1]; i++) { + data1[i] = my_rank + 12; + } + + + ret = H5Dwrite_ff(dset_id, + H5T_NATIVE_INT, + dataspace3, + dataspace4, + H5P_DEFAULT, + data1, + 0, + event_q); + assert(ret == 0); + + + count[0] = 2; + count[1] = 8; + offset[0] = 6; + offset[1] = 0; + + + data2 = (int *) malloc(sizeof(int)*count[0]*count[1]); + for (i=0; i < count[0]*count[1]; i++) { + data2[i] = my_rank + 13; + } + + + + fprintf (stderr, "%d: count[0]:%lli, count[1]:%lli, offset[0]: %lli, offset[1]: %lli\n", + my_rank, + count[0], count[1], offset[0], offset[1]); + + dataspace6 = H5Screate_simple(RANK, count, NULL); + + H5Sselect_hyperslab(dataspace5, + H5S_SELECT_SET, + offset, + NULL, + count, + NULL); + + + + ret = H5Dwrite_ff(dset_id, + H5T_NATIVE_INT, + dataspace6, + dataspace5, + H5P_DEFAULT, + data2, + 0, + event_q); + assert(ret == 0); + + + count[0] = 8; + count[1] = 8; + offset[0] = 2; + offset[1] = 0; + + + data3 = (int *) malloc(sizeof(int)*count[0]*count[1]); + for (i=0; i < count[0]*count[1]; i++) { + data3[i] = my_rank + 14; + } + + + + fprintf (stderr, "%d: count[0]:%lli, count[1]:%lli, offset[0]: %lli, offset[1]: %lli\n", + my_rank, + count[0], count[1], offset[0], offset[1]); + + dataspace8 = H5Screate_simple(RANK, count, NULL); + + H5Sselect_hyperslab(dataspace7, + H5S_SELECT_SET, + offset, + NULL, + count, + NULL); + + + + ret = H5Dwrite_ff(dset_id, + H5T_NATIVE_INT, + dataspace8, + dataspace7, + H5P_DEFAULT, + data3, + 0, + event_q); + assert(ret == 0); + + + + + + assert(H5Dclose_ff(dset_id, event_q) == 0); + H5Sclose(dataspaceID); + H5Sclose(dataspace2); + H5Sclose(dataspace3); + H5Sclose(dataspace4); + H5Sclose(dataspace5); + H5Sclose(dataspace6); + H5Pclose(fapl_id); + assert(H5Fclose_ff(file_id, event_q) == 0); + H5EQwait(event_q, &num_requests, &status); + free(status); + H5EQclose (event_q); + + free(data); + free(data1); + fprintf(stderr, "\n*****************************************************************************************************************\n"); + fprintf(stderr, "Finalize EFF stack\n"); + fprintf(stderr, "*****************************************************************************************************************\n"); + EFF_finalize(); + MPI_Finalize(); + + return 0; +} diff --git a/test/compactor_write_tests/multiple-writes-test5.c b/test/compactor_write_tests/multiple-writes-test5.c new file mode 100644 index 0000000..2cea44b --- /dev/null +++ b/test/compactor_write_tests/multiple-writes-test5.c @@ -0,0 +1,254 @@ +#include <stdio.h> +#include <stdlib.h> +#include <string.h> +#include <assert.h> +#include "mpi.h" +#include "hdf5.h" + +#define NX 16 /* dataset dimensions */ +#define NY 8 +#define RANK 2 + + + +int main (int argc, char **argv){ + + const char filename[] = "eff_file.h5"; + hid_t file_id; + hid_t dataspaceID, dataspace2; + hid_t dataspace3, dataspace4; + hid_t dataspace5, dataspace6; + hid_t dataspace7, dataspace8; + hid_t dset_id; + hid_t fapl_id, dxpl_id; + + const unsigned int nelem = 60; + int *data = NULL, *data1 = NULL; + int *data2 = NULL, *data3 = NULL; + unsigned int i = 0; + hsize_t dimsf[2]; + hsize_t count[2], offset[2]; + hsize_t s[2], b[2]; + int my_rank, my_size, ret; + int provided; + hid_t event_q, int_id; + int num_requests = 0; + H5_status_t *status = NULL; + + MPI_Init_thread(&argc, &argv, MPI_THREAD_MULTIPLE, &provided); + if(MPI_THREAD_MULTIPLE != provided) { + fprintf(stderr, "MPI does not have MPI_THREAD_MULTIPLE support\n"); + exit(1); + } + + + EFF_init (MPI_COMM_WORLD, MPI_INFO_NULL); + + MPI_Comm_rank (MPI_COMM_WORLD, &my_rank); + MPI_Comm_size (MPI_COMM_WORLD, &my_size); + + + if (my_size > 1){ + fprintf(stderr, "APP processes = %d cannot be greater than 1 \n", my_size); + MPI_Finalize(); + } + + + fapl_id = H5Pcreate (H5P_FILE_ACCESS); + H5Pset_fapl_iod(fapl_id, MPI_COMM_WORLD, MPI_INFO_NULL); + + event_q = H5EQcreate(fapl_id); + assert(event_q); + + file_id = H5Fcreate_ff(filename, H5F_ACC_TRUNC, H5P_DEFAULT, fapl_id, event_q); + assert(file_id); + + + dimsf[0] = NX; + dimsf[1] = NY; + + + dataspaceID = H5Screate_simple(RANK, dimsf, NULL); + dataspace4 = H5Screate_simple(RANK, dimsf, NULL); + dataspace5 = H5Screate_simple(RANK, dimsf, NULL); + dataspace7 = H5Screate_simple(RANK, dimsf, NULL); + + dset_id = H5Dcreate_ff(file_id, "D1" , H5T_NATIVE_INT, dataspaceID, + H5P_DEFAULT, H5P_DEFAULT, H5P_DEFAULT, 0, event_q); + assert(dset_id); + + count[0] = 4; + count[1] = 4; + offset[0] = 0; + offset[1] = 0; + + fprintf (stderr, "%d: count[0]:%lli, count[1]:%lli, offset[0]: %lli, offset[1]: %lli\n", + my_rank, + count[0], count[1], offset[0], offset[1]); + + dataspace2 = H5Screate_simple(RANK, count, NULL); + + H5Sselect_hyperslab(dataspaceID, + H5S_SELECT_SET, + offset, + NULL, + count, + NULL); + + data = (int *) malloc(sizeof(int)*count[0]*count[1]); + for (i=0; i < count[0]*count[1]; i++) { + data[i] = my_rank + 10; + } + + ret = H5Dwrite_ff(dset_id, + H5T_NATIVE_INT, + dataspace2, + dataspaceID, + H5P_DEFAULT, + data, + 0, + event_q); + assert(ret == 0); + + count[0] = 4; + count[1] = 4; + offset[0] = 6; + offset[1] = 0; + + + fprintf (stderr, "%d: count[0]:%lli, count[1]:%lli, offset[0]: %lli, offset[1]: %lli\n", + my_rank, + count[0], count[1], offset[0], offset[1]); + + dataspace3 = H5Screate_simple(RANK, count, NULL); + + + H5Sselect_hyperslab(dataspace4, + H5S_SELECT_SET, + offset, + NULL, + count, + NULL); + + + data1 = (int *) malloc(sizeof(int)*count[0]*count[1]); + for (i=0; i < count[0]*count[1]; i++) { + data1[i] = my_rank + 12; + } + + + ret = H5Dwrite_ff(dset_id, + H5T_NATIVE_INT, + dataspace3, + dataspace4, + H5P_DEFAULT, + data1, + 0, + event_q); + assert(ret == 0); + + + count[0] = 4; + count[1] = 4; + offset[0] = 3; + offset[1] = 0; + + + data2 = (int *) malloc(sizeof(int)*count[0]*count[1]); + for (i=0; i < count[0]*count[1]; i++) { + data2[i] = my_rank + 13; + } + + + + fprintf (stderr, "%d: count[0]:%lli, count[1]:%lli, offset[0]: %lli, offset[1]: %lli\n", + my_rank, + count[0], count[1], offset[0], offset[1]); + + dataspace6 = H5Screate_simple(RANK, count, NULL); + + H5Sselect_hyperslab(dataspace5, + H5S_SELECT_SET, + offset, + NULL, + count, + NULL); + + + + ret = H5Dwrite_ff(dset_id, + H5T_NATIVE_INT, + dataspace6, + dataspace5, + H5P_DEFAULT, + data2, + 0, + event_q); + assert(ret == 0); + + + count[0] = 8; + count[1] = 5; + offset[0] = 0; + offset[1] = 3; + + + data3 = (int *) malloc(sizeof(int)*count[0]*count[1]); + for (i=0; i < count[0]*count[1]; i++) { + data3[i] = my_rank + 14; + } + + + + fprintf (stderr, "%d: count[0]:%lli, count[1]:%lli, offset[0]: %lli, offset[1]: %lli\n", + my_rank, + count[0], count[1], offset[0], offset[1]); + + dataspace8 = H5Screate_simple(RANK, count, NULL); + + H5Sselect_hyperslab(dataspace7, + H5S_SELECT_SET, + offset, + NULL, + count, + NULL); + + + + ret = H5Dwrite_ff(dset_id, + H5T_NATIVE_INT, + dataspace8, + dataspace7, + H5P_DEFAULT, + data3, + 0, + event_q); + assert(ret == 0); + + + + + + assert(H5Dclose_ff(dset_id, event_q) == 0); + H5Sclose(dataspaceID); + H5Sclose(dataspace2); + H5Sclose(dataspace3); + H5Sclose(dataspace4); + H5Sclose(dataspace5); + H5Sclose(dataspace6); + H5Pclose(fapl_id); + assert(H5Fclose_ff(file_id, event_q) == 0); + H5EQwait(event_q, &num_requests, &status); + free(status); + H5EQclose (event_q); + + free(data); + free(data1); + fprintf(stderr, "\n*****************************************************************************************************************\n"); + fprintf(stderr, "Finalize EFF stack\n"); + fprintf(stderr, "*****************************************************************************************************************\n"); + EFF_finalize(); + MPI_Finalize(); + + return 0; +} diff --git a/test/compactor_write_tests/parallel_write.c b/test/compactor_write_tests/parallel_write.c new file mode 100644 index 0000000..e996388 --- /dev/null +++ b/test/compactor_write_tests/parallel_write.c @@ -0,0 +1,126 @@ +#include <stdio.h> +#include <stdlib.h> +#include <string.h> +#include <assert.h> +#include "mpi.h" +#include "hdf5.h" + +#define NX 8 /* dataset dimensions */ +#define NY 8 +#define RANK 2 + + + +int main (int argc, char **argv){ + + const char filename[] = "eff_file.h5"; + hid_t file_id; + hid_t dataspaceID, dataspace2; + hid_t dset_id; + hid_t fapl_id, dxpl_id; + + const unsigned int nelem = 60; + int *data = NULL; + unsigned int i = 0; + hsize_t dimsf[2]; + hsize_t count[2], offset[2]; + + int my_rank, my_size, ret; + int provided; + hid_t event_q, int_id; + + int num_requests = 0; + H5_status_t *status = NULL; + + MPI_Init_thread(&argc, &argv, MPI_THREAD_MULTIPLE, &provided); + if(MPI_THREAD_MULTIPLE != provided) { + fprintf(stderr, "MPI does not have MPI_THREAD_MULTIPLE support\n"); + exit(1); + } + + EFF_init (MPI_COMM_WORLD, MPI_INFO_NULL); + + MPI_Comm_rank (MPI_COMM_WORLD, &my_rank); + MPI_Comm_size (MPI_COMM_WORLD, &my_size); + + fprintf(stderr, "APP processes = %d, my rank is %d\n", my_size, my_rank); + + fapl_id = H5Pcreate (H5P_FILE_ACCESS); + H5Pset_fapl_iod(fapl_id, MPI_COMM_WORLD, MPI_INFO_NULL); + + event_q = H5EQcreate(fapl_id); + assert(event_q); + + file_id = H5Fcreate_ff(filename, H5F_ACC_TRUNC, H5P_DEFAULT, fapl_id, event_q); + assert(file_id); + + + dimsf[0] = NX; + dimsf[1] = NY; + + /* int_id = H5Tcopy(H5T_STD_I32LE); + ret = H5Tcommit_ff(file_id, "int", int_id, H5P_DEFAULT, H5P_DEFAULT, H5P_DEFAULT, + 0, event_q); + + assert(ret == 0);*/ + + dataspaceID = H5Screate_simple(RANK, dimsf, NULL); + + dset_id = H5Dcreate_ff(file_id, "D1" , H5T_NATIVE_INT, dataspaceID, + H5P_DEFAULT, H5P_DEFAULT, H5P_DEFAULT, 0, event_q); + assert(dset_id); + + + + count[0] = NY/my_size; + count[1] = 8; + offset[0] = my_rank * count[0]; + offset[1] = 0; + + dataspace2 = H5Screate_simple(RANK, count, NULL); + + + H5Sselect_hyperslab(dataspaceID, + H5S_SELECT_SET, + offset, + NULL, + count, + NULL); + + + data = (int *) malloc(sizeof(int)*count[0]*count[1]); + for (i=0; i < count[0]*count[1]; i++) { + data[i] = my_rank + 10; + } + + + ret = H5Dwrite_ff(dset_id, + H5T_NATIVE_INT, + dataspace2, + dataspaceID, + H5P_DEFAULT, + data, + 0, + event_q); + assert(ret == 0); + + + + assert(H5Dclose_ff(dset_id, event_q) == 0); + H5Sclose(dataspaceID); + H5Sclose(dataspace2); + H5Pclose(fapl_id); + assert(H5Fclose_ff(file_id, event_q) == 0); + H5EQwait(event_q, &num_requests, &status); + free(status); + H5EQclose (event_q); + + free(data); + fprintf(stderr, "\n*****************************************************************************************************************\n"); + fprintf(stderr, "Finalize EFF stack\n"); + fprintf(stderr, "*****************************************************************************************************************\n"); + EFF_finalize(); + MPI_Finalize(); + + return 0; +} diff --git a/test/compactor_write_tests/test_server.c b/test/compactor_write_tests/test_server.c new file mode 100644 index 0000000..5d71e35 --- /dev/null +++ b/test/compactor_write_tests/test_server.c @@ -0,0 +1,48 @@ +/*
+ * test_server.c: Server side of Milestone 3.3 Asynchronous I/O and initial
+ * IOD VOL plugin demonstration.
+ */
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <assert.h>
+#include <string.h>
+#include "mpi.h"
+#include "hdf5.h"
+
+int main(int argc, char **argv) {
+ int my_size, my_rank;
+ int provided;
+
+ MPI_Init_thread(&argc, &argv, MPI_THREAD_MULTIPLE, &provided);
+ if(MPI_THREAD_MULTIPLE != provided) {
+ printf("MPI does not have MPI_THREAD_MULTIPLE support\n");
+ exit(1);
+ }
+
+ MPI_Comm_rank(MPI_COMM_WORLD, &my_rank);
+ MPI_Comm_size(MPI_COMM_WORLD, &my_size);
+ printf("Number of server processes = %d, my rank is %d\n", my_size, my_rank);
+
+ H5open();
+ /* This call initiliazes the FS for the server processes (create metadata and
+ bulk data handles). It also registers with the Function shipper the
+ HDF5 VOL server routines that will be executed when the clients ship the VOL
+ routines. Then it executes a loop to receive requests from clients that map
+ to one of the registered callbacks.
+
+ Whenever a request is received, the corresponding callback is called which
+ inserts the operation into the AXE (the Asynchronous eXecution Engine) and
+ returns to receive another request. The AXE schedules that operation to
+ execute by assigning a thread to it. After the operation is complete, the
+ result is returned to the client through the function shipper complete call.
+
+ Finally, when all clients send a terminate call, the function shipper interface
+ is finalized the operation returns. */
+ H5VLiod_start_handler(MPI_COMM_WORLD, MPI_INFO_NULL);
+ H5close();
+ MPI_Finalize();
+
+ return 0;
+}
+
|