summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVishwanath Venkatesan <vish@hdfgroup.org>2013-07-25 19:22:22 (GMT)
committerVishwanath Venkatesan <vish@hdfgroup.org>2013-07-25 19:22:22 (GMT)
commit315e2305153d0b67dd78e119db694accfcce85a3 (patch)
tree56b499c4288e7c5c35a22addd7b45e7ec2d636c0
parent89f2fd84bd4474cdff15cbc7f0e5b7364b251bfe (diff)
downloadhdf5-315e2305153d0b67dd78e119db694accfcce85a3.zip
hdf5-315e2305153d0b67dd78e119db694accfcce85a3.tar.gz
hdf5-315e2305153d0b67dd78e119db694accfcce85a3.tar.bz2
[svn-r23933] 1. Overlapping writes fixes for more complicated scenarios handled.
2. Tests for testing compactor!
-rw-r--r--src/H5VLiod_compactor.c273
-rw-r--r--src/H5VLiod_dset.c89
-rw-r--r--src/H5VLiod_server.c16
-rw-r--r--test/compactor_write_tests/Makefile12
-rw-r--r--test/compactor_write_tests/multiple-writes-test1.c170
-rw-r--r--test/compactor_write_tests/multiple-writes-test2.c168
-rw-r--r--test/compactor_write_tests/multiple-writes-test3.c254
-rw-r--r--test/compactor_write_tests/multiple-writes-test4.c254
-rw-r--r--test/compactor_write_tests/multiple-writes-test5.c254
-rw-r--r--test/compactor_write_tests/parallel_write.c126
-rw-r--r--test/compactor_write_tests/test_server.c48
11 files changed, 1490 insertions, 174 deletions
diff --git a/src/H5VLiod_compactor.c b/src/H5VLiod_compactor.c
index 6ad92fe..997f054 100644
--- a/src/H5VLiod_compactor.c
+++ b/src/H5VLiod_compactor.c
@@ -79,10 +79,20 @@ static size_t H5VL_iod_copy_desc (block_container_t *sm_block,
size_t start,
size_t j);
+static size_t H5VL_iod_copy_desc_reduced (block_container_t *sm_block,
+ block_container_t *tmp_block,
+ size_t start, size_t counter,
+ size_t j);
+
static void H5VL_print_block_container (block_container_t *cont,
size_t num);
+
+static int H5VL_check_overlapped_offsets(hsize_t start_i, hsize_t start_j,
+ hsize_t end_i, hsize_t end_j);
+
+
/*---------------------------------------------------------------------*/
@@ -475,9 +485,12 @@ int H5VL_iod_create_request_list (compactor *queue, request_list_t **list,
fprintf(stderr,"%s:%d N-entires : %d\n",
__FILE__,__LINE__, request_id);
fprintf(stderr,"%s:%d Compactor Request List \n",__FILE__,__LINE__);
- fprintf(stderr,"Compactor-TUPLE: id -- AXE-id -- dataset -- dataspace -- fileblocks -- memblocks -- num_peers\n");
+ fprintf(stderr,
+ "Compactor-TUPLE: id -- AXE-id -- dataset -- dataspace \
+ -- fileblocks -- memblocks -- num_peers\n");
for ( i = 0; i < request_id; i++){
- fprintf(stderr, "Compactor-Entry: %d -- %llu -- %d -- %d -- %zd -- %zd -- %d\n",
+ fprintf(stderr,
+ "Compactor-Entry: %d -- %llu -- %d -- %d -- %zd -- %zd -- %d\n",
newlist[i].request_id,
newlist[i].axe_id,
newlist[i].dataset_id,
@@ -746,6 +759,7 @@ int H5VL_iod_compact_requests (request_list_t *list, int *total_requests,
fprintf (stderr,"%s:%d Merged with : %d\n", __FILE__, __LINE__,
list[request_list[1]].selection_id);
#endif
+
merge_flag = 0;
m_elmnt_size = list[request_list[0]].elementsize;
if ( 0 == H5VL_iod_select_overlap ( current_space,
@@ -894,6 +908,7 @@ int H5VL_iod_compact_requests (request_list_t *list, int *total_requests,
free(lselected_req);
lselected_req = NULL;
}
+#if DEBUG_COMPACTOR
for (i = 0; i < original_requests; i ++){
fprintf(stderr,"%s:%d, Request :%d --> %d\n",
__FILE__,
@@ -901,7 +916,7 @@ int H5VL_iod_compact_requests (request_list_t *list, int *total_requests,
i,
list[i].merged);
}
-
+#endif
*total_requests = original_requests;
done:
@@ -909,6 +924,25 @@ int H5VL_iod_compact_requests (request_list_t *list, int *total_requests,
}/*end H5VL_iod_compact_requests*/
+static int H5VL_check_overlapped_offsets(hsize_t start_i, hsize_t start_j,
+ hsize_t end_i, hsize_t end_j){
+
+ if (start_i < start_j){
+ if (start_j >= end_i){
+ return 0;
+ }
+ return 1;
+ }
+ else if (start_i > start_j){
+ if (start_i >= end_j){
+ return 0;
+ }
+ return 1;
+ }
+ else
+ return 1;
+}
+
/*-------------------------------------------------------------------------
* Function: H5VL_iod_reconstruct_overlapped_request
*
@@ -937,26 +971,38 @@ int H5VL_iod_reconstruct_overlapped_request (block_container_t *sf_block,
size_t rev_mblks, rev_fblks;
block_container_t *rev_sm_block=NULL, *rev_sf_block=NULL;
int ret_value = CP_SUCCESS;
+ hsize_t start_i , start_j, end_i, end_j;
int changed_cnt = 0, ii;
FUNC_ENTER_NOAPI(NULL)
rev_mblks = 0;
rev_fblks = 0;
- rev_sm_block = (block_container_t *) malloc (4 * sizeof (block_container_t));
- if (NULL == rev_sm_block)
- HGOTO_ERROR(H5E_HEAP, H5E_CANTALLOC, CP_FAIL,"Allocation error for rev_sm_block");
- rev_sf_block = (block_container_t *) malloc (4 * sizeof (block_container_t));
- if (NULL == rev_sf_block)
- HGOTO_ERROR(H5E_HEAP, H5E_CANTALLOC, CP_FAIL,"Allocation error for rev_sf_block");
+#if DEBUG_COMPACTOR
+ fprintf (stderr,
+ "in %s:%d sf[%zd].offset : %lli, sf[%zd].len: %zd\n",
+ __FILE__, __LINE__,i,sf_block[i].offset,i,sf_block[i].len);
+ fprintf (stderr,
+ "in %s:%d sf[%zd].offset: %lli, sf[%zd].len : %zd\n",
+ __FILE__,__LINE__,j,sf_block[j].offset,j,sf_block[j].len);
+#endif
+
+ start_i = sf_block[i].offset;
+ start_j = sf_block[j].offset;
+ end_i = sf_block[i].offset + sf_block[i].len;
+ end_j = sf_block[j].offset + sf_block[j].len;
+
+ if(H5VL_check_overlapped_offsets(start_i, start_j,
+ end_i, end_j)){
+
+ rev_sm_block = (block_container_t *) malloc (4 * sizeof (block_container_t));
+ if (NULL == rev_sm_block)
+ HGOTO_ERROR(H5E_HEAP, H5E_CANTALLOC, CP_FAIL,"Allocation error for rev_sm_block");
+ rev_sf_block = (block_container_t *) malloc (4 * sizeof (block_container_t));
+ if (NULL == rev_sf_block)
+ HGOTO_ERROR(H5E_HEAP, H5E_CANTALLOC, CP_FAIL,"Allocation error for rev_sf_block");
- fprintf (stderr, "sf[%zd].offset : %lli, sf[%zd].len: %zd, sf[%zd].offset: %lli, sf[%zd].len : %zd\n",
- i,sf_block[i].offset,i,sf_block[i].len,j,sf_block[j].offset,j,sf_block[j].len);
-
- if(sf_block[i].offset + sf_block[i].len >
- sf_block[j].offset){
-
if (sf_block[i].offset + sf_block[i].len <=
sf_block[j].offset + sf_block[j].len ){
/*This means the latest write entry starts in-between
@@ -976,11 +1022,9 @@ int H5VL_iod_reconstruct_overlapped_request (block_container_t *sf_block,
changed_cnt++;
}
else{
- /* j's start offset is different than I but in total they
- write the same or j write more.. so modify i's len..
- the offsets still remain untouched!*/
- rem_len = (size_t)(sf_block[i].offset + sf_block[i].len)
- - sf_block[j].offset;
+ /*j's start offset is greater than
+ i's + */
+ rem_len = (size_t)(sf_block[j].offset - sf_block[i].offset);
rev_sf_block[rev_fblks].offset = sf_block[i].offset;
rev_sm_block[rev_mblks].offset = sm_block[i].offset;
rev_sf_block[rev_fblks].len = rem_len;
@@ -1039,7 +1083,8 @@ int H5VL_iod_reconstruct_overlapped_request (block_container_t *sf_block,
rev_sf_block[rev_fblks].offset =
sf_block[j].offset + sf_block[j].len;
rev_sm_block[rev_mblks].offset =
- sm_block[i].offset + ((sf_block[j].offset +sf_block[j].len) - sf_block[i].offset);
+ sm_block[i].offset +
+ ((sf_block[j].offset +sf_block[j].len) - sf_block[i].offset);
rev_sf_block[rev_fblks].len = rem_len;
rev_sm_block[rev_mblks].len = rem_len;
changed[changed_cnt] = i;
@@ -1054,6 +1099,7 @@ int H5VL_iod_reconstruct_overlapped_request (block_container_t *sf_block,
}
else{
+
rem_len = (sf_block[j].offset + sf_block[j].len)
- sf_block[i].offset;
rev_sf_block[rev_fblks].offset =
@@ -1095,8 +1141,10 @@ int H5VL_iod_reconstruct_overlapped_request (block_container_t *sf_block,
fprintf (stderr,"rev_fblks: %zd\n",
rev_fblks);
for (jj = 0; jj < rev_fblks; jj++){
- fprintf (stderr, "rev_blks[%zd]: %lli, rev_blks[%zd]: %zd\n",
- jj, rev_sf_block[jj].offset, jj, rev_sf_block[jj].len);
+ fprintf (stderr,
+ "rev_blks[%zd]: %lli, rev_blks[%zd]: %zd\n",
+ jj, rev_sf_block[jj].offset, jj,
+ rev_sf_block[jj].len);
}
fflush(stderr);
#endif
@@ -1218,7 +1266,6 @@ int H5VL_iod_construct_merged_request (request_list_t *list,
sm_block[blck_cnt].offset = list[request_list[i]].mblocks[j].offset;
sm_block[blck_cnt].len = list[request_list[i]].mblocks[j].len;
axe_id_list[blck_cnt] = list[request_list[i]].axe_id;
-
#if DEBUG_COMPACTOR
fprintf(stderr, "in %s:%d foffset: %lli len: %zd, moffset: %lli, len: %zd\n",
__FILE__, __LINE__,
@@ -1233,10 +1280,11 @@ int H5VL_iod_construct_merged_request (request_list_t *list,
assert(blck_cnt == fblks);
loop_blks = fblks;
+
+ /* =============================================================================================*/
/* If its the same client, we do allow overlaps*/
/* In that case its assumed that the requests flow in temporally
So we can replace the memory buffer with the latest request */
-
if (overlap_flag){
rev_mblks = 0;
@@ -1268,22 +1316,25 @@ int H5VL_iod_construct_merged_request (request_list_t *list,
ii, j,
changed,
&changed_cnt)){
- HGOTO_ERROR(H5E_SYM, H5E_CANTINIT, CP_FAIL, "overlapped request construct error\n");
+ HGOTO_ERROR(H5E_SYM, H5E_CANTINIT, CP_FAIL,
+ "overlapped request construct error\n");
}
if (changed_cnt == 0){
#if DEBUG_COMPACTOR
fprintf(stderr,
- "Continuing no change with that combination.. the offsets remain as follows: \n");
- H5VL_print_block_container (f_block, loop_blks);
- H5VL_print_block_container (m_block, loop_blks);
+ "Continuing no change with that combination..\n");
+ /* H5VL_print_block_container (f_block, loop_blks);
+ H5VL_print_block_container (m_block, loop_blks); */
#endif
continue;
}
#if DEBUG_COMPACTOR
fprintf (stderr,
- "in %s:%d Completed reconstructing overlapped request changed: %zd fblks: %zd, mblks: %zd\n",
+ "in %s:%d Completed reconstructing request\n",
+ __FILE__, __LINE__);
+ fprintf(stderr,"in %s:%d changed: %zd fblks: %zd, mblks: %zd\n",
__FILE__, __LINE__, changed_cnt ,rev_fblks, rev_mblks);
for (k = 0; k < rev_fblks; k++){
fprintf (stderr, "%zd: OFFSET: %lli, LEN: %zd\n",
@@ -1291,74 +1342,58 @@ int H5VL_iod_construct_merged_request (request_list_t *list,
}
#endif
- tmp_block = (block_container_t *) malloc ((fblks+(rev_fblks - 2)) * sizeof(block_container_t));
+ tmp_block =
+ (block_container_t *)
+ malloc ((fblks+(rev_fblks - 2)) * sizeof(block_container_t));
if (NULL == tmp_block){
- HGOTO_ERROR(H5E_HEAP, H5E_CANTALLOC, CP_FAIL,"Allocation error for tmp_block");
+ HGOTO_ERROR(H5E_HEAP, H5E_CANTALLOC, CP_FAIL,
+ "Allocation error for tmp_block");
}
- tmpf_block = (block_container_t *) malloc ((fblks+(rev_fblks - 2)) * sizeof(block_container_t));
+ tmpf_block =
+ (block_container_t *)
+ malloc ((fblks+(rev_fblks - 2)) * sizeof(block_container_t));
if (NULL == tmpf_block){
- HGOTO_ERROR(H5E_HEAP, H5E_CANTALLOC, CP_FAIL,"Allocation error for tmpf_block");
+ HGOTO_ERROR(H5E_HEAP, H5E_CANTALLOC, CP_FAIL,
+ "Allocation error for tmpf_block");
}
new_blks = 0;
switch(changed_cnt){
case 1:
if (changed[0] == (int)ii){
-#if DEBUG_COMPACTOR
- fprintf (stderr,"Enters CASE 1 with changed_cnt : %d\n",
- changed_cnt);
-#endif
-
H5VL_iod_copy_desc(f_block, tmpf_block, 0, ii);
new_blks += H5VL_iod_copy_desc(m_block, tmp_block, 0,ii);
-
-#if DEBUG_COMPACTOR
- fprintf (stderr,"***** new_blks :%zd\n", new_blks);
H5VL_print_block_container (tmpf_block, new_blks);
- H5VL_print_block_container (tmp_block, new_blks);
-#endif
tmpf_block[new_blks].offset = tf_block[0].offset;
tmpf_block[new_blks].len = tf_block[0].len;
tmp_block[new_blks].offset = tm_block[0].offset;
tmp_block[new_blks].len = tm_block[0].len;
new_blks++;
-
-#if DEBUG_COMPACTOR
H5VL_print_block_container (tmpf_block, new_blks);
- H5VL_print_block_container (tmp_block, new_blks);
-
- fprintf (stderr, "ii+1 : %zd, j: %zd\n", ii+1, j);
-#endif
new_blks += H5VL_iod_copy_desc(f_block, tmpf_block, ii+1, j);
H5VL_iod_copy_desc(m_block, tmp_block, ii+1, j);
-
+ H5VL_print_block_container (tmpf_block, new_blks);
+ H5VL_iod_copy_desc_reduced(f_block, tmpf_block,
+ j+1, new_blks,
+ loop_blks);
+ new_blks += H5VL_iod_copy_desc_reduced(m_block,
+ tmp_block,
+ j+1, new_blks,
+ loop_blks);
loop_blks -= 1;
-
- H5VL_iod_copy_desc(f_block, tmpf_block, j+1, loop_blks);
- H5VL_iod_copy_desc(m_block, tmp_block, j+1, loop_blks);
-#if DEBUG_COMPACTOR
- H5VL_print_block_container (tmpf_block, loop_blks);
- H5VL_print_block_container (tmp_block, loop_blks);
-#endif
- ii -= 1;
+ H5VL_print_block_container (tmpf_block, new_blks);
+ fprintf (stderr, "new_blks: %zd, loop_blks: %zd\n",
+ new_blks, loop_blks);
+ if (!((int)(ii - 1) < 0))
+ ii -= 1;
}
break;
case 2:
-#if DEBUG_COMPACTOR
- fprintf (stderr,"Enters CASE 2 with changed_cnt : %d\n",
- changed_cnt);
-#endif
-
for (i = 0; i < 2; i++){
if (changed[i] == (int)ii){
new_blks += H5VL_iod_copy_desc(f_block, tmpf_block, 0, ii);
H5VL_iod_copy_desc(m_block, tmp_block, 0, ii);
-#if DEBUG_COMPACTOR
- fprintf (stderr,"***** new_blks :%zd\n", new_blks);
- H5VL_print_block_container (tmpf_block, new_blks);
- H5VL_print_block_container (tmp_block, new_blks);
-#endif
tmpf_block[new_blks].offset = tf_block[i].offset;
tmpf_block[new_blks].len = tf_block[i].len;
tmp_block[new_blks].offset = tm_block[i].offset;
@@ -1385,22 +1420,14 @@ int H5VL_iod_construct_merged_request (request_list_t *list,
case 3:
for (i = 0; i < 3; i ++){
if (changed[i] == (int)ii){
-#if DEBUG_COMPACTOR
- fprintf(stderr,"in Changed CASE 3 cnt: %zd and changed[%d]: %d, loop_blck: %zd\n",
- changed_cnt,i,changed[i], loop_blks );
-
-#endif
new_blks += H5VL_iod_copy_desc(f_block, tmpf_block, 0, ii);
H5VL_iod_copy_desc(m_block, tmp_block, 0, ii);
-#if DEBUG_COMPACTOR
- H5VL_print_block_container (tmpf_block, new_blks);
- H5VL_print_block_container (tmp_block, new_blks);
-#endif
tmpf_block[new_blks].offset = tf_block[i].offset;
tmpf_block[new_blks].len = tf_block[i].len;
tmp_block[new_blks].offset = tm_block[i].offset;
tmp_block[new_blks].len = tm_block[i].len;
+
new_blks++;
i++;
loop_blks += 1;
@@ -1409,19 +1436,8 @@ int H5VL_iod_construct_merged_request (request_list_t *list,
tmp_block[new_blks].offset = tm_block[i].offset;
tmp_block[new_blks].len = tm_block[i].len;
new_blks++;
-#if DEBUG_COMPACTOR
- fprintf (stderr, "*********** loop_blks: %zd, newblks: %zd\n",
- loop_blks, new_blks);
- H5VL_print_block_container (tmpf_block, new_blks);
- H5VL_print_block_container (tmp_block, new_blks);
-#endif
-
}
else{
-#if DEBUG_COMPACTOR
- fprintf(stderr,"in %s:%d Changed CASE 3 with Changed_cnt: %zd and changed[%d]: %d\n",
- __FILE__, __LINE__,changed_cnt,i,changed[i] );
-#endif
if (ii+1 < j){
new_blks += H5VL_iod_copy_desc(f_block, tmpf_block, ii+1, j);
H5VL_iod_copy_desc(m_block, tmp_block, ii+1, j);
@@ -1431,23 +1447,13 @@ int H5VL_iod_construct_merged_request (request_list_t *list,
tmp_block[new_blks].offset = tm_block[i].offset;
tmp_block[new_blks].len = tm_block[i].len;
new_blks++;
-#if DEBUG_COMPACTOR
- fprintf (stderr, "*********** loop_blks: %zd, newblks: %zd, j: %zd\n",
- loop_blks, new_blks, j);
- H5VL_print_block_container (tmpf_block, new_blks);
- H5VL_print_block_container (tmp_block, new_blks);
-#endif
-
if (new_blks < loop_blks){
- new_blks += H5VL_iod_copy_desc(f_block, tmpf_block, j+1, loop_blks);
+ new_blks +=
+ H5VL_iod_copy_desc(f_block, tmpf_block, j+1, loop_blks);
H5VL_iod_copy_desc(m_block, tmp_block, j+1, loop_blks);
}
}
}
-#if DEBUG_COMPACTOR
- fprintf (stderr, "Total revised blocks : %zd \n",
- new_blks);
-#endif
break;
default:
ret_value = CP_FAIL;
@@ -1456,17 +1462,10 @@ int H5VL_iod_construct_merged_request (request_list_t *list,
if (new_blks < loop_blks){
H5VL_iod_copy_desc(f_block, tmpf_block, new_blks, loop_blks);
- new_blks += H5VL_iod_copy_desc(m_block, tmp_block, new_blks, loop_blks);
+ new_blks +=
+ H5VL_iod_copy_desc(m_block, tmp_block, new_blks, loop_blks);
}
- if (NULL != m_block){
- free(m_block);
- m_block = NULL;
- }
- if (NULL != f_block){
- free(f_block);
- f_block = NULL;
- }
if (NULL != tf_block){
free(tf_block);
tf_block = NULL;
@@ -1475,14 +1474,27 @@ int H5VL_iod_construct_merged_request (request_list_t *list,
free(tm_block);
tm_block = NULL;
}
+
+ if (NULL != m_block){
+ free(m_block);
+ m_block = NULL;
+ }
+ if (NULL != f_block){
+ free(f_block);
+ f_block = NULL;
+ }
f_block = tmpf_block;
m_block = tmp_block;
+
}
#if DEBUG_COMPACTOR
- fprintf (stderr,"***********new_blks: %zd loop_blks: %zd**********\n",
- new_blks, loop_blks);
+ fprintf (stderr,
+ "***********AFTER ITERATION i: %zd, j: %zd**************\n",
+ ii, j);
H5VL_print_block_container (f_block, loop_blks);
H5VL_print_block_container (m_block, loop_blks);
+ fprintf (stderr,
+ "########################################################\n");
#endif
}
@@ -1508,7 +1520,8 @@ int H5VL_iod_construct_merged_request (request_list_t *list,
sorted = (int *) malloc (blck_cnt * sizeof(int));
if ( NULL == sorted ){
- HGOTO_ERROR(H5E_HEAP, H5E_CANTALLOC, CP_FAIL,"Allocation error for sorted array");
+ HGOTO_ERROR(H5E_HEAP, H5E_CANTALLOC, CP_FAIL,
+ "Allocation error for sorted array");
}
@@ -1516,11 +1529,12 @@ int H5VL_iod_construct_merged_request (request_list_t *list,
#if DEBUG_COMPACTOR
- fprintf (stderr, "********************* AFTER Adjusting for overlap **************\n");
-
+ fprintf (stderr,
+ "********************* AFTER Adjusting for overlap **************\n");
for ( j = 0; j < blck_cnt; j++){
- fprintf(stderr, "in %s:%d sorted_foffset: %lli sorted_len: %zd, sorted_moffset: %lli, sorted_mlen: %zd\n",
- __FILE__, __LINE__,
+ fprintf(stderr,
+ "sorted_foffset: %lli sorted_len: %zd,\
+ sorted_moffset: %lli, sorted_mlen: %zd\n",
sf_block[sorted[j]].offset, sf_block[sorted[j]].len, sm_block[sorted[j]].offset,
sm_block[sorted[j]].len);
}
@@ -1637,11 +1651,13 @@ static void H5VL_print_block_container (block_container_t *cont,
size_t num){
size_t k;
+#if DEBUG_COMPACTOR
for (k = 0; k < num; k++){
fprintf (stderr, "%zd: block.offset: %lli, block.len: %zd \n",
k,
cont[k].offset, cont[k].len);
}
+#endif
}
@@ -1652,15 +1668,30 @@ static size_t H5VL_iod_copy_desc (block_container_t *sm_block,
size_t j){
size_t i, cnt = 0;
-#if DEBUG_COMPACTOR
- fprintf (stderr,"start: %zd, end: %zd\n",
- start, j);
-#endif
+
for (i = start; i < j; i++){
tmp_block[i].offset = sm_block[i].offset;
tmp_block[i].len = sm_block[i].len;
cnt++;
}
+
+ return cnt;
+}
+
+static size_t H5VL_iod_copy_desc_reduced (block_container_t *sm_block,
+ block_container_t *tmp_block,
+ size_t start, size_t counter,
+ size_t j){
+
+ size_t i, cnt = 0;
+
+ for (i = start; i < j; i++){
+ tmp_block[counter].offset = sm_block[i].offset;
+ tmp_block[counter].len = sm_block[i].len;
+ counter++;
+ cnt++;
+ }
+
return cnt;
}
diff --git a/src/H5VLiod_dset.c b/src/H5VLiod_dset.c
index 2d67178..2d88976 100644
--- a/src/H5VLiod_dset.c
+++ b/src/H5VLiod_dset.c
@@ -768,6 +768,7 @@ int H5VL_iod_server_compactor_write (void *_list, int num_requests)
fprintf (stderr,"COMPACTOR WRITE: Request %d has not been merged \n", request_counter+1);
size = list[request_counter].mem_length;
buf = (void *)list[request_counter].mem_buf;
+
#if DEBUG_COMPACTOR
ptr = (int *) buf;
fprintf(stderr,"COMPACTOR WRITE: Received a buffer of size %zd with values :",
@@ -789,11 +790,12 @@ int H5VL_iod_server_compactor_write (void *_list, int num_requests)
space_id = list[request_counter].selection_id;
dst_id = input->dset_type_id;
+#if DEBUG_COMPACTOR
fprintf (stderr,"space_id: %d, selection_id: %d\n",
space_id, list[request_counter].selection_id);
-
/*Even in the case its not merged the buffer was already extracted*/
fprintf (stderr,"COMPACTOR WRITE: Request %d has been merged \n", request_counter+1);
+#endif
}
dst_size = H5Tget_size(dst_id);
@@ -813,10 +815,11 @@ int H5VL_iod_server_compactor_write (void *_list, int num_requests)
if(H5VL_iod_get_file_desc(space_id, &num_descriptors, NULL) < 0)
HGOTO_ERROR(H5E_DATASPACE, H5E_CANTGET, FAIL, "unable to generate IOD file descriptor from dataspace selection");
+#if DEBUG_COMPACTOR
fprintf (stderr,"COMPACTOR num_descriptors : %llu, ndims: %d\n",
num_descriptors,
ndims);
-
+#endif
/* allocate the IOD hyperslab descriptors needed */
if(NULL == (hslabs = (iod_hyperslab_t *)malloc
(sizeof(iod_hyperslab_t) * (size_t)num_descriptors)))
@@ -849,7 +852,7 @@ int H5VL_iod_server_compactor_write (void *_list, int num_requests)
num_bytes = 0;
num_elems = 1;
-
+
/* determine how many bytes the current descriptor holds */
for(i=0 ; i<ndims ; i++)
num_elems *= (hslabs[n].count[i] * hslabs[n].block[i]);
@@ -874,22 +877,22 @@ int H5VL_iod_server_compactor_write (void *_list, int num_requests)
if (list[request_counter].merged == MERGED){
#if DEBUG_COMPACTOR
- fprintf (stderr, "COMPACTOR WRITE i: %d, num_mblocks: %zd, num_bytes: %lli\n",
- request_counter, list[request_counter].num_mblocks, num_bytes);
- for ( j = 0; j < list[request_counter].num_mblocks; j++){
+ fprintf (stderr, "COMPACTOR WRITE i: %d, num_mblocks: %zd, num_bytes: %lli, start_reqs: %lli\n",
+ request_counter, list[request_counter].num_mblocks, num_bytes, start_reqs);
+ for ( j = curr_j; j < list[request_counter].num_mblocks; j++){
fprintf (stderr,"COMPACTOR WRITE j: %lli, len: %zd\n",
j, list[request_counter].mblocks[j].len);
}
#endif
k = 0;
- mem_reqs = 0; start_reqs = curr_j;
+ mem_reqs = 0;
fprintf (stderr,"COMPACTOR, curr_j: %lli, j: %lli, num_mblocks: %zd\n",
curr_j, j, list[request_counter].num_mblocks);
+ start_reqs = curr_j;
for (j = curr_j; j < list[request_counter].num_mblocks; j++){
-
- if (k < num_bytes){
+ if (k < num_bytes){
if (bytes_left){
k += bytes_left;
mem_reqs += 1;
@@ -911,57 +914,49 @@ int H5VL_iod_server_compactor_write (void *_list, int num_requests)
}
}
-#if 0
- fprintf (stderr,
- "COMPACTOR WRITE k: %lli mem_reqs: %lli start_reqs: %lli, curr_j: %lli\n",
- k,
- mem_reqs,
- start_reqs,
- curr_j);
-#endif
+ curr_j += 1;
+ fprintf (stderr,"COMPACTOR WRITE start_reqs: %lli, mem_reqs: %lli\n",
+ start_reqs, mem_reqs);
/*Determined the number of entries in the memory block for this hslab
descriptor*/
- mem_desc = (iod_mem_desc_t *) malloc (sizeof(iod_mem_desc_t));
- mem_desc->nfrag = mem_reqs;
- mem_desc->frag =
- (iod_mem_frag_t *) malloc ((int)mem_reqs * sizeof(iod_mem_frag_t));
- k = curr_k;
+ if (NULL == mem_desc){
+ mem_desc = (iod_mem_desc_t *) malloc (sizeof(iod_mem_desc_t));
+ mem_desc->nfrag = mem_reqs;
+ mem_desc->frag =
+ (iod_mem_frag_t *) malloc ((int)mem_reqs * sizeof(iod_mem_frag_t));
+ }
+ k = 0;
- fprintf (stderr,"COMPACTOR WRITE start_reqs: %lli, mem_reqs: %lli\n",
- start_reqs, mem_reqs);
- for ( j = start_reqs; j < mem_reqs; j++){
+ fprintf (stderr,"COMPACTOR WRITE k: %lli start_reqs: %lli, mem_reqs: %lli\n",
+ k, start_reqs, mem_reqs);
+
+ for ( j = start_reqs; j < (start_reqs + mem_reqs); j++){
if ((j == curr_j) && (bytes_left)){
- mem_desc->frag[k].addr = (void *)(uintptr_t)(curr_offset);
+ mem_desc->frag[k].addr = (void *)(curr_offset);
mem_desc->frag[k].len = bytes_left;
}
else{
mem_desc->frag[k].addr = (void *)
- (uintptr_t)(list[request_counter].mblocks[j].offset);
+ (list[request_counter].mblocks[j].offset);
mem_desc->frag[k].len = list[request_counter].mblocks[j].len;
-#if 0
- fprintf(stderr,"COMPACTOR %lli: off: %lli, off: %lli len: %llu\n",
- k,
- (hsize_t)mem_desc->frag[k].addr,
- list[request_counter].mblocks[j].offset,
- mem_desc->frag[k].len);
-#endif
}
k++;
}
curr_k = k;
-#if DEBUG_COMPACTOR
- for ( j = 0; j < k; j++){
+
+ for ( j = 0 ; j < k; j++){
+
ptr = (int *)mem_desc->frag[j].addr;
fprintf(stderr, "COMPACTOR MERGED WRITE IOD_BUFFER j: %lli, k: %lli len: %llu\n",
j,k, mem_desc->frag[j].len);
for (ii = 0; ii < mem_desc->frag[j].len/sizeof(int); ii++)
fprintf(stderr, "%d ", ptr[ii]);
fprintf(stderr, "\n");
+
}
fflush(stderr);
-#endif
}
file_desc = hslabs[n];
@@ -972,29 +967,34 @@ int H5VL_iod_server_compactor_write (void *_list, int num_requests)
(size_t)file_desc.block[i], (size_t)file_desc.count[i]);
}
#endif
-
if(iod_array_write(iod_oh, IOD_TID_UNKNOWN, NULL,
mem_desc /*This is where the memory descriptor goes*/,
&file_desc,
- NULL, NULL) < 0)
+ NULL, NULL) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_WRITEERROR, FAIL, "can't write to array object");
+
if (NULL != mem_desc){
- if (NULL != mem_desc->frag){
- free(mem_desc->frag);
+ iod_mem_frag_t *frag = mem_desc->frag;
+ if (NULL != frag){
+ free(frag);
}
free(mem_desc);
mem_desc = NULL;
}
+
+ }
+
+
+
+ if (NULL != hslabs){
for(n=0 ; n<num_descriptors ; n++) {
free(hslabs[n].start);
free(hslabs[n].stride);
free(hslabs[n].block);
free(hslabs[n].count);
}
- if(hslabs)
- free(hslabs);
-
+ free(hslabs);
}
/* write from array object */
@@ -1008,7 +1008,6 @@ int H5VL_iod_server_compactor_write (void *_list, int num_requests)
#if H5VL_IOD_DEBUG
fprintf(stderr, "Done with dset write, sending %d response to client\n", ret_value);
#endif
-
}
diff --git a/src/H5VLiod_server.c b/src/H5VLiod_server.c
index 05959f3..df41e8b 100644
--- a/src/H5VLiod_server.c
+++ b/src/H5VLiod_server.c
@@ -1396,10 +1396,11 @@ int H5VL_iod_server_dset_compactor(op_data_t *op_data, int request_type)
FUNC_ENTER_NOAPI_NOINIT
#if DEBUG_COMPACTOR
- fprintf (stderr, "id: %d, Enters the the dset_compactor, compactor_flag :%d\n",
+ fprintf(stderr, "id: %d, Enters the the dset_compactor, compactor_flag :%d\n",
request_id,
compactor_queue_flag);
fflush(stderr);
+#endif
if (NULL == curr_queue){
if (compactor_queue_flag){
@@ -1410,13 +1411,12 @@ int H5VL_iod_server_dset_compactor(op_data_t *op_data, int request_type)
fprintf(stderr,"Compactor Not present with flag : %d\n",
compactor_queue_flag);
}
- else
- fprintf(stderr,"Queue exists with compactor_flag : %d, and %d reqs\n",
- compactor_queue_flag,
- H5VL_iod_get_number_of_requests(curr_queue));
- fflush(stderr);
-
-#endif
+ else{
+ fprintf(stderr,"Queue exists with compactor_flag : %d, and %d reqs\n", compactor_queue_flag,
+ H5VL_iod_get_number_of_requests(curr_queue));
+ fflush(stderr);
+ }
+
input = (dset_io_in_t *)op_data->input;
diff --git a/test/compactor_write_tests/Makefile b/test/compactor_write_tests/Makefile
new file mode 100644
index 0000000..2286f6a
--- /dev/null
+++ b/test/compactor_write_tests/Makefile
@@ -0,0 +1,12 @@
+all:
+ h5pcc -o multiple-writes-test1 multiple-writes-test1.c
+ h5pcc -o multiple-writes-test2 multiple-writes-test2.c
+ h5pcc -o multiple-writes-test3 multiple-writes-test3.c
+ h5pcc -o multiple-writes-test4 multiple-writes-test4.c
+ h5pcc -o multiple-writes-test5 multiple-writes-test5.c
+ h5pcc -o parallel_write parallel_write.c
+ h5pcc -o server test_server.c
+
+clean:
+ rm -rf multiple-writes-test1 multiple-writes-test2 multiple-writes-test3 multiple-writes-test4 multiple-writes-test5 server *.o *~
+ rm -rf parallel_write *.cfg
diff --git a/test/compactor_write_tests/multiple-writes-test1.c b/test/compactor_write_tests/multiple-writes-test1.c
new file mode 100644
index 0000000..3648bd7
--- /dev/null
+++ b/test/compactor_write_tests/multiple-writes-test1.c
@@ -0,0 +1,170 @@
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <assert.h>
+#include "mpi.h"
+#include "hdf5.h"
+
+#define NX 16 /* dataset dimensions */
+#define NY 8
+#define RANK 2
+
+/*Entire write gets replaced
+write buffer values start with 10's and increases and writes increase*/
+
+
+int main (int argc, char **argv){
+
+ const char filename[] = "eff_file.h5";
+ hid_t file_id;
+ hid_t dataspaceID, dataspace2;
+ hid_t dataspace3, dataspace4;
+ hid_t dataspace5, dataspace6;
+ hid_t dset_id;
+ hid_t fapl_id, dxpl_id;
+
+ const unsigned int nelem = 60;
+ int *data = NULL, *data1 = NULL;
+ unsigned int i = 0;
+ hsize_t dimsf[2];
+ hsize_t count[2], offset[2];
+ hsize_t s[2], b[2];
+ int my_rank, my_size, ret;
+ int provided;
+ hid_t event_q, int_id;
+ int num_requests = 0;
+ H5_status_t *status = NULL;
+
+ MPI_Init_thread(&argc, &argv, MPI_THREAD_MULTIPLE, &provided);
+ if(MPI_THREAD_MULTIPLE != provided) {
+ fprintf(stderr, "MPI does not have MPI_THREAD_MULTIPLE support\n");
+ exit(1);
+ }
+
+
+ EFF_init (MPI_COMM_WORLD, MPI_INFO_NULL);
+
+ MPI_Comm_rank (MPI_COMM_WORLD, &my_rank);
+ MPI_Comm_size (MPI_COMM_WORLD, &my_size);
+
+
+ if (my_size > 1){
+ fprintf(stderr, "APP processes = %d cannot be greater than 1 \n", my_size);
+ MPI_Finalize();
+ }
+
+
+ fapl_id = H5Pcreate (H5P_FILE_ACCESS);
+ H5Pset_fapl_iod(fapl_id, MPI_COMM_WORLD, MPI_INFO_NULL);
+
+ event_q = H5EQcreate(fapl_id);
+ assert(event_q);
+
+ file_id = H5Fcreate_ff(filename, H5F_ACC_TRUNC, H5P_DEFAULT, fapl_id, event_q);
+ assert(file_id);
+
+
+ dimsf[0] = NX;
+ dimsf[1] = NY;
+
+
+ dataspaceID = H5Screate_simple(RANK, dimsf, NULL);
+ dataspace4 = H5Screate_simple(RANK, dimsf, NULL);
+
+ dset_id = H5Dcreate_ff(file_id, "D1" , H5T_NATIVE_INT, dataspaceID,
+ H5P_DEFAULT, H5P_DEFAULT, H5P_DEFAULT, 0, event_q);
+ assert(dset_id);
+
+ count[0] = 8;
+ count[1] = 8;
+ offset[0] = 0;
+ offset[1] = 0;
+
+ fprintf (stderr, "%d: count[0]:%lli, count[1]:%lli, offset[0]: %lli, offset[1]: %lli\n",
+ my_rank,
+ count[0], count[1], offset[0], offset[1]);
+
+ dataspace2 = H5Screate_simple(RANK, count, NULL);
+
+ H5Sselect_hyperslab(dataspaceID,
+ H5S_SELECT_SET,
+ offset,
+ NULL,
+ count,
+ NULL);
+
+ data = (int *) malloc(sizeof(int)*count[0]*count[1]);
+ for (i=0; i < count[0]*count[1]; i++) {
+ data[i] = my_rank + 10;
+ }
+
+ ret = H5Dwrite_ff(dset_id,
+ H5T_NATIVE_INT,
+ dataspace2,
+ dataspaceID,
+ H5P_DEFAULT,
+ data,
+ 0,
+ event_q);
+ assert(ret == 0);
+
+ count[0] = 8;
+ count[1] = 8;
+ offset[0] = 0;
+ offset[1] = 0;
+
+
+ fprintf (stderr, "%d: count[0]:%lli, count[1]:%lli, offset[0]: %lli, offset[1]: %lli\n",
+ my_rank,
+ count[0], count[1], offset[0], offset[1]);
+
+ dataspace3 = H5Screate_simple(RANK, count, NULL);
+
+
+ H5Sselect_hyperslab(dataspace4,
+ H5S_SELECT_SET,
+ offset,
+ NULL,
+ count,
+ NULL);
+
+
+ data1 = (int *) malloc(sizeof(int)*count[0]*count[1]);
+ for (i=0; i < count[0]*count[1]; i++) {
+ data1[i] = my_rank + 12;
+
+ }
+
+
+ ret = H5Dwrite_ff(dset_id,
+ H5T_NATIVE_INT,
+ dataspace3,
+ dataspace4,
+ H5P_DEFAULT,
+ data1,
+ 0,
+ event_q);
+ assert(ret == 0);
+
+ /* assert(H5Tclose_ff(int_id, event_q) == 0);*/
+ assert(H5Dclose_ff(dset_id, event_q) == 0);
+ H5Sclose(dataspaceID);
+ H5Sclose(dataspace2);
+ H5Sclose(dataspace3);
+ H5Sclose(dataspace4);
+ H5Pclose(fapl_id);
+ assert(H5Fclose_ff(file_id, event_q) == 0);
+ H5EQwait(event_q, &num_requests, &status);
+ free(status);
+ H5EQclose (event_q);
+
+ free(data);
+ free(data1);
+ fprintf(stderr, "\n*****************************************************************************************************************\n");
+ fprintf(stderr, "Finalize EFF stack\n");
+ fprintf(stderr, "*****************************************************************************************************************\n");
+ EFF_finalize();
+ MPI_Finalize();
+
+ return 0;
+}
diff --git a/test/compactor_write_tests/multiple-writes-test2.c b/test/compactor_write_tests/multiple-writes-test2.c
new file mode 100644
index 0000000..d18795e
--- /dev/null
+++ b/test/compactor_write_tests/multiple-writes-test2.c
@@ -0,0 +1,168 @@
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <assert.h>
+#include "mpi.h"
+#include "hdf5.h"
+
+#define NX 16 /* dataset dimensions */
+#define NY 8
+#define RANK 2
+
+/*Second write -- writes more than first write but overlaps*/
+
+int main (int argc, char **argv){
+
+ const char filename[] = "eff_file.h5";
+ hid_t file_id;
+ hid_t dataspaceID, dataspace2;
+ hid_t dataspace3, dataspace4;
+ hid_t dataspace5, dataspace6;
+ hid_t dset_id;
+ hid_t fapl_id, dxpl_id;
+
+ const unsigned int nelem = 60;
+ int *data = NULL, *data1 = NULL;
+ unsigned int i = 0;
+ hsize_t dimsf[2];
+ hsize_t count[2], offset[2];
+ hsize_t s[2], b[2];
+ int my_rank, my_size, ret;
+ int provided;
+ hid_t event_q, int_id;
+ int num_requests = 0;
+ H5_status_t *status = NULL;
+
+ MPI_Init_thread(&argc, &argv, MPI_THREAD_MULTIPLE, &provided);
+ if(MPI_THREAD_MULTIPLE != provided) {
+ fprintf(stderr, "MPI does not have MPI_THREAD_MULTIPLE support\n");
+ exit(1);
+ }
+
+
+ EFF_init (MPI_COMM_WORLD, MPI_INFO_NULL);
+
+ MPI_Comm_rank (MPI_COMM_WORLD, &my_rank);
+ MPI_Comm_size (MPI_COMM_WORLD, &my_size);
+
+
+ if (my_size > 1){
+ fprintf(stderr, "APP processes = %d cannot be greater than 1 \n", my_size);
+ MPI_Finalize();
+ }
+
+
+ fapl_id = H5Pcreate (H5P_FILE_ACCESS);
+ H5Pset_fapl_iod(fapl_id, MPI_COMM_WORLD, MPI_INFO_NULL);
+
+ event_q = H5EQcreate(fapl_id);
+ assert(event_q);
+
+ file_id = H5Fcreate_ff(filename, H5F_ACC_TRUNC, H5P_DEFAULT, fapl_id, event_q);
+ assert(file_id);
+
+
+ dimsf[0] = NX;
+ dimsf[1] = NY;
+
+
+ dataspaceID = H5Screate_simple(RANK, dimsf, NULL);
+ dataspace4 = H5Screate_simple(RANK, dimsf, NULL);
+
+ dset_id = H5Dcreate_ff(file_id, "D1" , H5T_NATIVE_INT, dataspaceID,
+ H5P_DEFAULT, H5P_DEFAULT, H5P_DEFAULT, 0, event_q);
+ assert(dset_id);
+
+ count[0] = 4;
+ count[1] = 8;
+ offset[0] = 0;
+ offset[1] = 0;
+
+ fprintf (stderr, "%d: count[0]:%lli, count[1]:%lli, offset[0]: %lli, offset[1]: %lli\n",
+ my_rank,
+ count[0], count[1], offset[0], offset[1]);
+
+ dataspace2 = H5Screate_simple(RANK, count, NULL);
+
+ H5Sselect_hyperslab(dataspaceID,
+ H5S_SELECT_SET,
+ offset,
+ NULL,
+ count,
+ NULL);
+
+ data = (int *) malloc(sizeof(int)*count[0]*count[1]);
+ for (i=0; i < count[0]*count[1]; i++) {
+ data[i] = my_rank + 10;
+ }
+
+ ret = H5Dwrite_ff(dset_id,
+ H5T_NATIVE_INT,
+ dataspace2,
+ dataspaceID,
+ H5P_DEFAULT,
+ data,
+ 0,
+ event_q);
+ assert(ret == 0);
+
+ count[0] = 8;
+ count[1] = 8;
+ offset[0] = 2;
+ offset[1] = 0;
+
+
+ fprintf (stderr, "%d: count[0]:%lli, count[1]:%lli, offset[0]: %lli, offset[1]: %lli\n",
+ my_rank,
+ count[0], count[1], offset[0], offset[1]);
+
+ dataspace3 = H5Screate_simple(RANK, count, NULL);
+
+
+ H5Sselect_hyperslab(dataspace4,
+ H5S_SELECT_SET,
+ offset,
+ NULL,
+ count,
+ NULL);
+
+
+ data1 = (int *) malloc(sizeof(int)*count[0]*count[1]);
+ for (i=0; i < count[0]*count[1]; i++) {
+ data1[i] = my_rank + 12;
+
+ }
+
+
+ ret = H5Dwrite_ff(dset_id,
+ H5T_NATIVE_INT,
+ dataspace3,
+ dataspace4,
+ H5P_DEFAULT,
+ data1,
+ 0,
+ event_q);
+ assert(ret == 0);
+
+ /* assert(H5Tclose_ff(int_id, event_q) == 0);*/
+ assert(H5Dclose_ff(dset_id, event_q) == 0);
+ H5Sclose(dataspaceID);
+ H5Sclose(dataspace2);
+ H5Sclose(dataspace3);
+ H5Sclose(dataspace4);
+ H5Pclose(fapl_id);
+ assert(H5Fclose_ff(file_id, event_q) == 0);
+ H5EQwait(event_q, &num_requests, &status);
+ free(status);
+ H5EQclose (event_q);
+
+ free(data);
+ free(data1);
+ fprintf(stderr, "\n*****************************************************************************************************************\n");
+ fprintf(stderr, "Finalize EFF stack\n");
+ fprintf(stderr, "*****************************************************************************************************************\n");
+ EFF_finalize();
+ MPI_Finalize();
+
+ return 0;
+}
diff --git a/test/compactor_write_tests/multiple-writes-test3.c b/test/compactor_write_tests/multiple-writes-test3.c
new file mode 100644
index 0000000..9fa7170
--- /dev/null
+++ b/test/compactor_write_tests/multiple-writes-test3.c
@@ -0,0 +1,254 @@
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <assert.h>
+#include "mpi.h"
+#include "hdf5.h"
+
+#define NX 16 /* dataset dimensions */
+#define NY 8
+#define RANK 2
+
+
+
+int main (int argc, char **argv){
+
+ const char filename[] = "eff_file.h5";
+ hid_t file_id;
+ hid_t dataspaceID, dataspace2;
+ hid_t dataspace3, dataspace4;
+ hid_t dataspace5, dataspace6;
+ hid_t dataspace7, dataspace8;
+ hid_t dset_id;
+ hid_t fapl_id, dxpl_id;
+
+ const unsigned int nelem = 60;
+ int *data = NULL, *data1 = NULL;
+ int *data2 = NULL, *data3 = NULL;
+ unsigned int i = 0;
+ hsize_t dimsf[2];
+ hsize_t count[2], offset[2];
+ hsize_t s[2], b[2];
+ int my_rank, my_size, ret;
+ int provided;
+ hid_t event_q, int_id;
+ int num_requests = 0;
+ H5_status_t *status = NULL;
+
+ MPI_Init_thread(&argc, &argv, MPI_THREAD_MULTIPLE, &provided);
+ if(MPI_THREAD_MULTIPLE != provided) {
+ fprintf(stderr, "MPI does not have MPI_THREAD_MULTIPLE support\n");
+ exit(1);
+ }
+
+
+ EFF_init (MPI_COMM_WORLD, MPI_INFO_NULL);
+
+ MPI_Comm_rank (MPI_COMM_WORLD, &my_rank);
+ MPI_Comm_size (MPI_COMM_WORLD, &my_size);
+
+
+ if (my_size > 1){
+ fprintf(stderr, "APP processes = %d cannot be greater than 1 \n", my_size);
+ MPI_Finalize();
+ }
+
+
+ fapl_id = H5Pcreate (H5P_FILE_ACCESS);
+ H5Pset_fapl_iod(fapl_id, MPI_COMM_WORLD, MPI_INFO_NULL);
+
+ event_q = H5EQcreate(fapl_id);
+ assert(event_q);
+
+ file_id = H5Fcreate_ff(filename, H5F_ACC_TRUNC, H5P_DEFAULT, fapl_id, event_q);
+ assert(file_id);
+
+
+ dimsf[0] = NX;
+ dimsf[1] = NY;
+
+
+ dataspaceID = H5Screate_simple(RANK, dimsf, NULL);
+ dataspace4 = H5Screate_simple(RANK, dimsf, NULL);
+ dataspace5 = H5Screate_simple(RANK, dimsf, NULL);
+ dataspace7 = H5Screate_simple(RANK, dimsf, NULL);
+
+ dset_id = H5Dcreate_ff(file_id, "D1" , H5T_NATIVE_INT, dataspaceID,
+ H5P_DEFAULT, H5P_DEFAULT, H5P_DEFAULT, 0, event_q);
+ assert(dset_id);
+
+ count[0] = 4;
+ count[1] = 8;
+ offset[0] = 0;
+ offset[1] = 0;
+
+ fprintf (stderr, "%d: count[0]:%lli, count[1]:%lli, offset[0]: %lli, offset[1]: %lli\n",
+ my_rank,
+ count[0], count[1], offset[0], offset[1]);
+
+ dataspace2 = H5Screate_simple(RANK, count, NULL);
+
+ H5Sselect_hyperslab(dataspaceID,
+ H5S_SELECT_SET,
+ offset,
+ NULL,
+ count,
+ NULL);
+
+ data = (int *) malloc(sizeof(int)*count[0]*count[1]);
+ for (i=0; i < count[0]*count[1]; i++) {
+ data[i] = my_rank + 10;
+ }
+
+ ret = H5Dwrite_ff(dset_id,
+ H5T_NATIVE_INT,
+ dataspace2,
+ dataspaceID,
+ H5P_DEFAULT,
+ data,
+ 0,
+ event_q);
+ assert(ret == 0);
+
+ count[0] = 2;
+ count[1] = 8;
+ offset[0] = 2;
+ offset[1] = 0;
+
+
+ fprintf (stderr, "%d: count[0]:%lli, count[1]:%lli, offset[0]: %lli, offset[1]: %lli\n",
+ my_rank,
+ count[0], count[1], offset[0], offset[1]);
+
+ dataspace3 = H5Screate_simple(RANK, count, NULL);
+
+
+ H5Sselect_hyperslab(dataspace4,
+ H5S_SELECT_SET,
+ offset,
+ NULL,
+ count,
+ NULL);
+
+
+ data1 = (int *) malloc(sizeof(int)*count[0]*count[1]);
+ for (i=0; i < count[0]*count[1]; i++) {
+ data1[i] = my_rank + 12;
+ }
+
+
+ ret = H5Dwrite_ff(dset_id,
+ H5T_NATIVE_INT,
+ dataspace3,
+ dataspace4,
+ H5P_DEFAULT,
+ data1,
+ 0,
+ event_q);
+ assert(ret == 0);
+
+
+ count[0] = 2;
+ count[1] = 8;
+ offset[0] = 6;
+ offset[1] = 0;
+
+
+ data2 = (int *) malloc(sizeof(int)*count[0]*count[1]);
+ for (i=0; i < count[0]*count[1]; i++) {
+ data2[i] = my_rank + 13;
+ }
+
+
+
+ fprintf (stderr, "%d: count[0]:%lli, count[1]:%lli, offset[0]: %lli, offset[1]: %lli\n",
+ my_rank,
+ count[0], count[1], offset[0], offset[1]);
+
+ dataspace6 = H5Screate_simple(RANK, count, NULL);
+
+ H5Sselect_hyperslab(dataspace5,
+ H5S_SELECT_SET,
+ offset,
+ NULL,
+ count,
+ NULL);
+
+
+
+ ret = H5Dwrite_ff(dset_id,
+ H5T_NATIVE_INT,
+ dataspace6,
+ dataspace5,
+ H5P_DEFAULT,
+ data2,
+ 0,
+ event_q);
+ assert(ret == 0);
+
+
+ count[0] = 4;
+ count[1] = 8;
+ offset[0] = 3;
+ offset[1] = 0;
+
+
+ data3 = (int *) malloc(sizeof(int)*count[0]*count[1]);
+ for (i=0; i < count[0]*count[1]; i++) {
+ data3[i] = my_rank + 14;
+ }
+
+
+
+ fprintf (stderr, "%d: count[0]:%lli, count[1]:%lli, offset[0]: %lli, offset[1]: %lli\n",
+ my_rank,
+ count[0], count[1], offset[0], offset[1]);
+
+ dataspace8 = H5Screate_simple(RANK, count, NULL);
+
+ H5Sselect_hyperslab(dataspace7,
+ H5S_SELECT_SET,
+ offset,
+ NULL,
+ count,
+ NULL);
+
+
+
+ ret = H5Dwrite_ff(dset_id,
+ H5T_NATIVE_INT,
+ dataspace8,
+ dataspace7,
+ H5P_DEFAULT,
+ data3,
+ 0,
+ event_q);
+ assert(ret == 0);
+
+
+
+
+
+ assert(H5Dclose_ff(dset_id, event_q) == 0);
+ H5Sclose(dataspaceID);
+ H5Sclose(dataspace2);
+ H5Sclose(dataspace3);
+ H5Sclose(dataspace4);
+ H5Sclose(dataspace5);
+ H5Sclose(dataspace6);
+ H5Pclose(fapl_id);
+ assert(H5Fclose_ff(file_id, event_q) == 0);
+ H5EQwait(event_q, &num_requests, &status);
+ free(status);
+ H5EQclose (event_q);
+
+ free(data);
+ free(data1);
+ fprintf(stderr, "\n*****************************************************************************************************************\n");
+ fprintf(stderr, "Finalize EFF stack\n");
+ fprintf(stderr, "*****************************************************************************************************************\n");
+ EFF_finalize();
+ MPI_Finalize();
+
+ return 0;
+}
diff --git a/test/compactor_write_tests/multiple-writes-test4.c b/test/compactor_write_tests/multiple-writes-test4.c
new file mode 100644
index 0000000..0f32697
--- /dev/null
+++ b/test/compactor_write_tests/multiple-writes-test4.c
@@ -0,0 +1,254 @@
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <assert.h>
+#include "mpi.h"
+#include "hdf5.h"
+
+#define NX 16 /* dataset dimensions */
+#define NY 8
+#define RANK 2
+
+
+
+int main (int argc, char **argv){
+
+ const char filename[] = "eff_file.h5";
+ hid_t file_id;
+ hid_t dataspaceID, dataspace2;
+ hid_t dataspace3, dataspace4;
+ hid_t dataspace5, dataspace6;
+ hid_t dataspace7, dataspace8;
+ hid_t dset_id;
+ hid_t fapl_id, dxpl_id;
+
+ const unsigned int nelem = 60;
+ int *data = NULL, *data1 = NULL;
+ int *data2 = NULL, *data3 = NULL;
+ unsigned int i = 0;
+ hsize_t dimsf[2];
+ hsize_t count[2], offset[2];
+ hsize_t s[2], b[2];
+ int my_rank, my_size, ret;
+ int provided;
+ hid_t event_q, int_id;
+ int num_requests = 0;
+ H5_status_t *status = NULL;
+
+ MPI_Init_thread(&argc, &argv, MPI_THREAD_MULTIPLE, &provided);
+ if(MPI_THREAD_MULTIPLE != provided) {
+ fprintf(stderr, "MPI does not have MPI_THREAD_MULTIPLE support\n");
+ exit(1);
+ }
+
+
+ EFF_init (MPI_COMM_WORLD, MPI_INFO_NULL);
+
+ MPI_Comm_rank (MPI_COMM_WORLD, &my_rank);
+ MPI_Comm_size (MPI_COMM_WORLD, &my_size);
+
+
+ if (my_size > 1){
+ fprintf(stderr, "APP processes = %d cannot be greater than 1 \n", my_size);
+ MPI_Finalize();
+ }
+
+
+ fapl_id = H5Pcreate (H5P_FILE_ACCESS);
+ H5Pset_fapl_iod(fapl_id, MPI_COMM_WORLD, MPI_INFO_NULL);
+
+ event_q = H5EQcreate(fapl_id);
+ assert(event_q);
+
+ file_id = H5Fcreate_ff(filename, H5F_ACC_TRUNC, H5P_DEFAULT, fapl_id, event_q);
+ assert(file_id);
+
+
+ dimsf[0] = NX;
+ dimsf[1] = NY;
+
+
+ dataspaceID = H5Screate_simple(RANK, dimsf, NULL);
+ dataspace4 = H5Screate_simple(RANK, dimsf, NULL);
+ dataspace5 = H5Screate_simple(RANK, dimsf, NULL);
+ dataspace7 = H5Screate_simple(RANK, dimsf, NULL);
+
+ dset_id = H5Dcreate_ff(file_id, "D1" , H5T_NATIVE_INT, dataspaceID,
+ H5P_DEFAULT, H5P_DEFAULT, H5P_DEFAULT, 0, event_q);
+ assert(dset_id);
+
+ count[0] = 4;
+ count[1] = 8;
+ offset[0] = 0;
+ offset[1] = 0;
+
+ fprintf (stderr, "%d: count[0]:%lli, count[1]:%lli, offset[0]: %lli, offset[1]: %lli\n",
+ my_rank,
+ count[0], count[1], offset[0], offset[1]);
+
+ dataspace2 = H5Screate_simple(RANK, count, NULL);
+
+ H5Sselect_hyperslab(dataspaceID,
+ H5S_SELECT_SET,
+ offset,
+ NULL,
+ count,
+ NULL);
+
+ data = (int *) malloc(sizeof(int)*count[0]*count[1]);
+ for (i=0; i < count[0]*count[1]; i++) {
+ data[i] = my_rank + 10;
+ }
+
+ ret = H5Dwrite_ff(dset_id,
+ H5T_NATIVE_INT,
+ dataspace2,
+ dataspaceID,
+ H5P_DEFAULT,
+ data,
+ 0,
+ event_q);
+ assert(ret == 0);
+
+ count[0] = 2;
+ count[1] = 8;
+ offset[0] = 2;
+ offset[1] = 0;
+
+
+ fprintf (stderr, "%d: count[0]:%lli, count[1]:%lli, offset[0]: %lli, offset[1]: %lli\n",
+ my_rank,
+ count[0], count[1], offset[0], offset[1]);
+
+ dataspace3 = H5Screate_simple(RANK, count, NULL);
+
+
+ H5Sselect_hyperslab(dataspace4,
+ H5S_SELECT_SET,
+ offset,
+ NULL,
+ count,
+ NULL);
+
+
+ data1 = (int *) malloc(sizeof(int)*count[0]*count[1]);
+ for (i=0; i < count[0]*count[1]; i++) {
+ data1[i] = my_rank + 12;
+ }
+
+
+ ret = H5Dwrite_ff(dset_id,
+ H5T_NATIVE_INT,
+ dataspace3,
+ dataspace4,
+ H5P_DEFAULT,
+ data1,
+ 0,
+ event_q);
+ assert(ret == 0);
+
+
+ count[0] = 2;
+ count[1] = 8;
+ offset[0] = 6;
+ offset[1] = 0;
+
+
+ data2 = (int *) malloc(sizeof(int)*count[0]*count[1]);
+ for (i=0; i < count[0]*count[1]; i++) {
+ data2[i] = my_rank + 13;
+ }
+
+
+
+ fprintf (stderr, "%d: count[0]:%lli, count[1]:%lli, offset[0]: %lli, offset[1]: %lli\n",
+ my_rank,
+ count[0], count[1], offset[0], offset[1]);
+
+ dataspace6 = H5Screate_simple(RANK, count, NULL);
+
+ H5Sselect_hyperslab(dataspace5,
+ H5S_SELECT_SET,
+ offset,
+ NULL,
+ count,
+ NULL);
+
+
+
+ ret = H5Dwrite_ff(dset_id,
+ H5T_NATIVE_INT,
+ dataspace6,
+ dataspace5,
+ H5P_DEFAULT,
+ data2,
+ 0,
+ event_q);
+ assert(ret == 0);
+
+
+ count[0] = 8;
+ count[1] = 8;
+ offset[0] = 2;
+ offset[1] = 0;
+
+
+ data3 = (int *) malloc(sizeof(int)*count[0]*count[1]);
+ for (i=0; i < count[0]*count[1]; i++) {
+ data3[i] = my_rank + 14;
+ }
+
+
+
+ fprintf (stderr, "%d: count[0]:%lli, count[1]:%lli, offset[0]: %lli, offset[1]: %lli\n",
+ my_rank,
+ count[0], count[1], offset[0], offset[1]);
+
+ dataspace8 = H5Screate_simple(RANK, count, NULL);
+
+ H5Sselect_hyperslab(dataspace7,
+ H5S_SELECT_SET,
+ offset,
+ NULL,
+ count,
+ NULL);
+
+
+
+ ret = H5Dwrite_ff(dset_id,
+ H5T_NATIVE_INT,
+ dataspace8,
+ dataspace7,
+ H5P_DEFAULT,
+ data3,
+ 0,
+ event_q);
+ assert(ret == 0);
+
+
+
+
+
+ assert(H5Dclose_ff(dset_id, event_q) == 0);
+ H5Sclose(dataspaceID);
+ H5Sclose(dataspace2);
+ H5Sclose(dataspace3);
+ H5Sclose(dataspace4);
+ H5Sclose(dataspace5);
+ H5Sclose(dataspace6);
+ H5Pclose(fapl_id);
+ assert(H5Fclose_ff(file_id, event_q) == 0);
+ H5EQwait(event_q, &num_requests, &status);
+ free(status);
+ H5EQclose (event_q);
+
+ free(data);
+ free(data1);
+ fprintf(stderr, "\n*****************************************************************************************************************\n");
+ fprintf(stderr, "Finalize EFF stack\n");
+ fprintf(stderr, "*****************************************************************************************************************\n");
+ EFF_finalize();
+ MPI_Finalize();
+
+ return 0;
+}
diff --git a/test/compactor_write_tests/multiple-writes-test5.c b/test/compactor_write_tests/multiple-writes-test5.c
new file mode 100644
index 0000000..2cea44b
--- /dev/null
+++ b/test/compactor_write_tests/multiple-writes-test5.c
@@ -0,0 +1,254 @@
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <assert.h>
+#include "mpi.h"
+#include "hdf5.h"
+
+#define NX 16 /* dataset dimensions */
+#define NY 8
+#define RANK 2
+
+
+
+int main (int argc, char **argv){
+
+ const char filename[] = "eff_file.h5";
+ hid_t file_id;
+ hid_t dataspaceID, dataspace2;
+ hid_t dataspace3, dataspace4;
+ hid_t dataspace5, dataspace6;
+ hid_t dataspace7, dataspace8;
+ hid_t dset_id;
+ hid_t fapl_id, dxpl_id;
+
+ const unsigned int nelem = 60;
+ int *data = NULL, *data1 = NULL;
+ int *data2 = NULL, *data3 = NULL;
+ unsigned int i = 0;
+ hsize_t dimsf[2];
+ hsize_t count[2], offset[2];
+ hsize_t s[2], b[2];
+ int my_rank, my_size, ret;
+ int provided;
+ hid_t event_q, int_id;
+ int num_requests = 0;
+ H5_status_t *status = NULL;
+
+ MPI_Init_thread(&argc, &argv, MPI_THREAD_MULTIPLE, &provided);
+ if(MPI_THREAD_MULTIPLE != provided) {
+ fprintf(stderr, "MPI does not have MPI_THREAD_MULTIPLE support\n");
+ exit(1);
+ }
+
+
+ EFF_init (MPI_COMM_WORLD, MPI_INFO_NULL);
+
+ MPI_Comm_rank (MPI_COMM_WORLD, &my_rank);
+ MPI_Comm_size (MPI_COMM_WORLD, &my_size);
+
+
+ if (my_size > 1){
+ fprintf(stderr, "APP processes = %d cannot be greater than 1 \n", my_size);
+ MPI_Finalize();
+ }
+
+
+ fapl_id = H5Pcreate (H5P_FILE_ACCESS);
+ H5Pset_fapl_iod(fapl_id, MPI_COMM_WORLD, MPI_INFO_NULL);
+
+ event_q = H5EQcreate(fapl_id);
+ assert(event_q);
+
+ file_id = H5Fcreate_ff(filename, H5F_ACC_TRUNC, H5P_DEFAULT, fapl_id, event_q);
+ assert(file_id);
+
+
+ dimsf[0] = NX;
+ dimsf[1] = NY;
+
+
+ dataspaceID = H5Screate_simple(RANK, dimsf, NULL);
+ dataspace4 = H5Screate_simple(RANK, dimsf, NULL);
+ dataspace5 = H5Screate_simple(RANK, dimsf, NULL);
+ dataspace7 = H5Screate_simple(RANK, dimsf, NULL);
+
+ dset_id = H5Dcreate_ff(file_id, "D1" , H5T_NATIVE_INT, dataspaceID,
+ H5P_DEFAULT, H5P_DEFAULT, H5P_DEFAULT, 0, event_q);
+ assert(dset_id);
+
+ count[0] = 4;
+ count[1] = 4;
+ offset[0] = 0;
+ offset[1] = 0;
+
+ fprintf (stderr, "%d: count[0]:%lli, count[1]:%lli, offset[0]: %lli, offset[1]: %lli\n",
+ my_rank,
+ count[0], count[1], offset[0], offset[1]);
+
+ dataspace2 = H5Screate_simple(RANK, count, NULL);
+
+ H5Sselect_hyperslab(dataspaceID,
+ H5S_SELECT_SET,
+ offset,
+ NULL,
+ count,
+ NULL);
+
+ data = (int *) malloc(sizeof(int)*count[0]*count[1]);
+ for (i=0; i < count[0]*count[1]; i++) {
+ data[i] = my_rank + 10;
+ }
+
+ ret = H5Dwrite_ff(dset_id,
+ H5T_NATIVE_INT,
+ dataspace2,
+ dataspaceID,
+ H5P_DEFAULT,
+ data,
+ 0,
+ event_q);
+ assert(ret == 0);
+
+ count[0] = 4;
+ count[1] = 4;
+ offset[0] = 6;
+ offset[1] = 0;
+
+
+ fprintf (stderr, "%d: count[0]:%lli, count[1]:%lli, offset[0]: %lli, offset[1]: %lli\n",
+ my_rank,
+ count[0], count[1], offset[0], offset[1]);
+
+ dataspace3 = H5Screate_simple(RANK, count, NULL);
+
+
+ H5Sselect_hyperslab(dataspace4,
+ H5S_SELECT_SET,
+ offset,
+ NULL,
+ count,
+ NULL);
+
+
+ data1 = (int *) malloc(sizeof(int)*count[0]*count[1]);
+ for (i=0; i < count[0]*count[1]; i++) {
+ data1[i] = my_rank + 12;
+ }
+
+
+ ret = H5Dwrite_ff(dset_id,
+ H5T_NATIVE_INT,
+ dataspace3,
+ dataspace4,
+ H5P_DEFAULT,
+ data1,
+ 0,
+ event_q);
+ assert(ret == 0);
+
+
+ count[0] = 4;
+ count[1] = 4;
+ offset[0] = 3;
+ offset[1] = 0;
+
+
+ data2 = (int *) malloc(sizeof(int)*count[0]*count[1]);
+ for (i=0; i < count[0]*count[1]; i++) {
+ data2[i] = my_rank + 13;
+ }
+
+
+
+ fprintf (stderr, "%d: count[0]:%lli, count[1]:%lli, offset[0]: %lli, offset[1]: %lli\n",
+ my_rank,
+ count[0], count[1], offset[0], offset[1]);
+
+ dataspace6 = H5Screate_simple(RANK, count, NULL);
+
+ H5Sselect_hyperslab(dataspace5,
+ H5S_SELECT_SET,
+ offset,
+ NULL,
+ count,
+ NULL);
+
+
+
+ ret = H5Dwrite_ff(dset_id,
+ H5T_NATIVE_INT,
+ dataspace6,
+ dataspace5,
+ H5P_DEFAULT,
+ data2,
+ 0,
+ event_q);
+ assert(ret == 0);
+
+
+ count[0] = 8;
+ count[1] = 5;
+ offset[0] = 0;
+ offset[1] = 3;
+
+
+ data3 = (int *) malloc(sizeof(int)*count[0]*count[1]);
+ for (i=0; i < count[0]*count[1]; i++) {
+ data3[i] = my_rank + 14;
+ }
+
+
+
+ fprintf (stderr, "%d: count[0]:%lli, count[1]:%lli, offset[0]: %lli, offset[1]: %lli\n",
+ my_rank,
+ count[0], count[1], offset[0], offset[1]);
+
+ dataspace8 = H5Screate_simple(RANK, count, NULL);
+
+ H5Sselect_hyperslab(dataspace7,
+ H5S_SELECT_SET,
+ offset,
+ NULL,
+ count,
+ NULL);
+
+
+
+ ret = H5Dwrite_ff(dset_id,
+ H5T_NATIVE_INT,
+ dataspace8,
+ dataspace7,
+ H5P_DEFAULT,
+ data3,
+ 0,
+ event_q);
+ assert(ret == 0);
+
+
+
+
+
+ assert(H5Dclose_ff(dset_id, event_q) == 0);
+ H5Sclose(dataspaceID);
+ H5Sclose(dataspace2);
+ H5Sclose(dataspace3);
+ H5Sclose(dataspace4);
+ H5Sclose(dataspace5);
+ H5Sclose(dataspace6);
+ H5Pclose(fapl_id);
+ assert(H5Fclose_ff(file_id, event_q) == 0);
+ H5EQwait(event_q, &num_requests, &status);
+ free(status);
+ H5EQclose (event_q);
+
+ free(data);
+ free(data1);
+ fprintf(stderr, "\n*****************************************************************************************************************\n");
+ fprintf(stderr, "Finalize EFF stack\n");
+ fprintf(stderr, "*****************************************************************************************************************\n");
+ EFF_finalize();
+ MPI_Finalize();
+
+ return 0;
+}
diff --git a/test/compactor_write_tests/parallel_write.c b/test/compactor_write_tests/parallel_write.c
new file mode 100644
index 0000000..e996388
--- /dev/null
+++ b/test/compactor_write_tests/parallel_write.c
@@ -0,0 +1,126 @@
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <assert.h>
+#include "mpi.h"
+#include "hdf5.h"
+
+#define NX 8 /* dataset dimensions */
+#define NY 8
+#define RANK 2
+
+
+
+int main (int argc, char **argv){
+
+ const char filename[] = "eff_file.h5";
+ hid_t file_id;
+ hid_t dataspaceID, dataspace2;
+ hid_t dset_id;
+ hid_t fapl_id, dxpl_id;
+
+ const unsigned int nelem = 60;
+ int *data = NULL;
+ unsigned int i = 0;
+ hsize_t dimsf[2];
+ hsize_t count[2], offset[2];
+
+ int my_rank, my_size, ret;
+ int provided;
+ hid_t event_q, int_id;
+
+ int num_requests = 0;
+ H5_status_t *status = NULL;
+
+ MPI_Init_thread(&argc, &argv, MPI_THREAD_MULTIPLE, &provided);
+ if(MPI_THREAD_MULTIPLE != provided) {
+ fprintf(stderr, "MPI does not have MPI_THREAD_MULTIPLE support\n");
+ exit(1);
+ }
+
+ EFF_init (MPI_COMM_WORLD, MPI_INFO_NULL);
+
+ MPI_Comm_rank (MPI_COMM_WORLD, &my_rank);
+ MPI_Comm_size (MPI_COMM_WORLD, &my_size);
+
+ fprintf(stderr, "APP processes = %d, my rank is %d\n", my_size, my_rank);
+
+ fapl_id = H5Pcreate (H5P_FILE_ACCESS);
+ H5Pset_fapl_iod(fapl_id, MPI_COMM_WORLD, MPI_INFO_NULL);
+
+ event_q = H5EQcreate(fapl_id);
+ assert(event_q);
+
+ file_id = H5Fcreate_ff(filename, H5F_ACC_TRUNC, H5P_DEFAULT, fapl_id, event_q);
+ assert(file_id);
+
+
+ dimsf[0] = NX;
+ dimsf[1] = NY;
+
+ /* int_id = H5Tcopy(H5T_STD_I32LE);
+ ret = H5Tcommit_ff(file_id, "int", int_id, H5P_DEFAULT, H5P_DEFAULT, H5P_DEFAULT,
+ 0, event_q);
+
+ assert(ret == 0);*/
+
+ dataspaceID = H5Screate_simple(RANK, dimsf, NULL);
+
+ dset_id = H5Dcreate_ff(file_id, "D1" , H5T_NATIVE_INT, dataspaceID,
+ H5P_DEFAULT, H5P_DEFAULT, H5P_DEFAULT, 0, event_q);
+ assert(dset_id);
+
+
+
+ count[0] = NY/my_size;
+ count[1] = 8;
+ offset[0] = my_rank * count[0];
+ offset[1] = 0;
+
+ dataspace2 = H5Screate_simple(RANK, count, NULL);
+
+
+ H5Sselect_hyperslab(dataspaceID,
+ H5S_SELECT_SET,
+ offset,
+ NULL,
+ count,
+ NULL);
+
+
+ data = (int *) malloc(sizeof(int)*count[0]*count[1]);
+ for (i=0; i < count[0]*count[1]; i++) {
+ data[i] = my_rank + 10;
+ }
+
+
+ ret = H5Dwrite_ff(dset_id,
+ H5T_NATIVE_INT,
+ dataspace2,
+ dataspaceID,
+ H5P_DEFAULT,
+ data,
+ 0,
+ event_q);
+ assert(ret == 0);
+
+
+
+ assert(H5Dclose_ff(dset_id, event_q) == 0);
+ H5Sclose(dataspaceID);
+ H5Sclose(dataspace2);
+ H5Pclose(fapl_id);
+ assert(H5Fclose_ff(file_id, event_q) == 0);
+ H5EQwait(event_q, &num_requests, &status);
+ free(status);
+ H5EQclose (event_q);
+
+ free(data);
+ fprintf(stderr, "\n*****************************************************************************************************************\n");
+ fprintf(stderr, "Finalize EFF stack\n");
+ fprintf(stderr, "*****************************************************************************************************************\n");
+ EFF_finalize();
+ MPI_Finalize();
+
+ return 0;
+}
diff --git a/test/compactor_write_tests/test_server.c b/test/compactor_write_tests/test_server.c
new file mode 100644
index 0000000..5d71e35
--- /dev/null
+++ b/test/compactor_write_tests/test_server.c
@@ -0,0 +1,48 @@
+/*
+ * test_server.c: Server side of Milestone 3.3 Asynchronous I/O and initial
+ * IOD VOL plugin demonstration.
+ */
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <assert.h>
+#include <string.h>
+#include "mpi.h"
+#include "hdf5.h"
+
+int main(int argc, char **argv) {
+ int my_size, my_rank;
+ int provided;
+
+ MPI_Init_thread(&argc, &argv, MPI_THREAD_MULTIPLE, &provided);
+ if(MPI_THREAD_MULTIPLE != provided) {
+ printf("MPI does not have MPI_THREAD_MULTIPLE support\n");
+ exit(1);
+ }
+
+ MPI_Comm_rank(MPI_COMM_WORLD, &my_rank);
+ MPI_Comm_size(MPI_COMM_WORLD, &my_size);
+ printf("Number of server processes = %d, my rank is %d\n", my_size, my_rank);
+
+ H5open();
+ /* This call initiliazes the FS for the server processes (create metadata and
+ bulk data handles). It also registers with the Function shipper the
+ HDF5 VOL server routines that will be executed when the clients ship the VOL
+ routines. Then it executes a loop to receive requests from clients that map
+ to one of the registered callbacks.
+
+ Whenever a request is received, the corresponding callback is called which
+ inserts the operation into the AXE (the Asynchronous eXecution Engine) and
+ returns to receive another request. The AXE schedules that operation to
+ execute by assigning a thread to it. After the operation is complete, the
+ result is returned to the client through the function shipper complete call.
+
+ Finally, when all clients send a terminate call, the function shipper interface
+ is finalized the operation returns. */
+ H5VLiod_start_handler(MPI_COMM_WORLD, MPI_INFO_NULL);
+ H5close();
+ MPI_Finalize();
+
+ return 0;
+}
+