summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorRichard Warren <Richard.Warren@hdfgroup.org>2020-07-01 16:15:16 (GMT)
committerRichard Warren <Richard.Warren@hdfgroup.org>2020-07-01 16:15:16 (GMT)
commit8b7167488091054e77f2b5ece385963f3cd1f923 (patch)
tree1769b9a2a3141ffa6a35d7d212e76b709700330e
parent5a6d9b3a4e12cc399ce27361755f0c8d069e256b (diff)
downloadhdf5-8b7167488091054e77f2b5ece385963f3cd1f923.zip
hdf5-8b7167488091054e77f2b5ece385963f3cd1f923.tar.gz
hdf5-8b7167488091054e77f2b5ece385963f3cd1f923.tar.bz2
Various bug fixes for the multi-threaded IO concentrator(s)
-rw-r--r--src/H5FDsubfile.c21
-rw-r--r--src/H5FDsubfile_mpi.c340
-rw-r--r--src/H5FDsubfile_private.h8
-rw-r--r--src/H5FDsubfile_threads.c41
-rw-r--r--testpar/CMakeLists.txt1
-rw-r--r--testpar/Makefile.am2
-rw-r--r--testpar/t_subfile_openclose.c16
-rw-r--r--testpar/t_subfile_readwrite.c4
8 files changed, 298 insertions, 135 deletions
diff --git a/src/H5FDsubfile.c b/src/H5FDsubfile.c
index accded7..a467533 100644
--- a/src/H5FDsubfile.c
+++ b/src/H5FDsubfile.c
@@ -127,11 +127,11 @@ int sf_write_data(int fd, int64_t file_offset, void *data_buffer, int64_t data_s
char *this_data = (char *)data_buffer;
ssize_t bytes_remaining = (ssize_t) data_size;
ssize_t written = 0;
-
while(bytes_remaining) {
if ((written = pwrite(fd, this_data, (size_t)bytes_remaining, file_offset)) < 0) {
perror("pwrite failed!");
fflush(stdout);
+ break;
}
else {
if (sf_verbose_flag) {
@@ -234,14 +234,29 @@ herr_t
H5FDsubfiling_finalize(void)
{
herr_t ret_value = SUCCEED; /* Return value */
+ sf_topology_t *thisApp = NULL;
FUNC_ENTER_API(FAIL)
H5TRACE0("e","");
/* Shutdown the IO Concentrator threads */
- sf_shutdown_flag = 1;
- usleep(100);
+
+ if (topology_id != H5I_INVALID_HID) {
+ thisApp = get_subfiling_object(topology_id);
+ }
+
+ if (thisApp && thisApp->rank_is_ioc) {
+ begin_thread_exclusive();
+ sf_shutdown_flag = 1;
+ end_thread_exclusive();
+
+ usleep(100);
+
+ wait_for_thread_main();
+ }
+
MPI_Barrier(MPI_COMM_WORLD);
+
delete_subfiling_context(context_id);
FUNC_LEAVE_API(ret_value)
diff --git a/src/H5FDsubfile_mpi.c b/src/H5FDsubfile_mpi.c
index c66c6b7..fda4928 100644
--- a/src/H5FDsubfile_mpi.c
+++ b/src/H5FDsubfile_mpi.c
@@ -262,7 +262,7 @@ static void gather_topology_info(sf_topology_t *info)
if (info->topology)
return;
- if (sf_world_size > 1) {
+ if (sf_world_size) {
long hostid = gethostid();
layout_t my_hostinfo;
layout_t *topology = (layout_t *)calloc((size_t)sf_world_size+1, sizeof(layout_t));
@@ -275,10 +275,12 @@ static void gather_topology_info(sf_topology_t *info)
my_hostinfo.rank = sf_world_rank;
my_hostinfo.hostid = hostid;
info->topology[sf_world_rank] = my_hostinfo;
- if (MPI_Allgather(&my_hostinfo, 2, MPI_LONG,
- info->topology, 2, MPI_LONG,
- MPI_COMM_WORLD) == MPI_SUCCESS) {
- qsort(info->topology, (size_t)sf_world_size, sizeof(layout_t), compare_hostid);
+ if (sf_world_size > 1) {
+ if (MPI_Allgather(&my_hostinfo, 2, MPI_LONG,
+ info->topology, 2, MPI_LONG,
+ MPI_COMM_WORLD) == MPI_SUCCESS) {
+ qsort(info->topology, (size_t)sf_world_size, sizeof(layout_t), compare_hostid);
+ }
}
}
}
@@ -294,6 +296,8 @@ static int count_nodes(sf_topology_t *info)
nextid = info->topology[0].hostid;
info->node_ranks = (int *)calloc((size_t)(info->world_size+1), sizeof(int));
+ assert(info->node_ranks != NULL);
+
if (nextid == info->hostid)
hostid_index = 0;
@@ -326,12 +330,13 @@ H5FD__determine_ioc_count(int world_size, int world_rank, sf_topology_t **thisap
int node_index;
int iocs_per_node = 1;
char *envValue = NULL;
- sf_topology_t *app_topology = (sf_topology_t *)calloc(1, sizeof(sf_topology_t));
+ sf_topology_t *app_topology = (sf_topology_t *)malloc(sizeof(sf_topology_t));
assert(app_topology != NULL);
+ memset(app_topology, 0, sizeof(sf_topology_t));
app_topology->world_size = world_size;
app_topology->world_rank = world_rank;
- io_concentrator = (int *)calloc((size_t)world_size, sizeof(int));
+ io_concentrator = (int *)malloc(((size_t)world_size * sizeof(int)));
assert(io_concentrator != NULL);
ioc_count = count_nodes (app_topology);
/* FIXME: This should ONLY be used for testing!
@@ -402,14 +407,13 @@ int
H5FD__init_subfile_context(subfiling_context_t **newContext, int n_iocs, int world_size, int world_rank, bool rank_is_ioc)
{
int status;
- subfiling_context_t *next = (subfiling_context_t *)
- malloc(sizeof(subfiling_context_t));
+ subfiling_context_t *next = (subfiling_context_t *) malloc(sizeof(subfiling_context_t));
+ assert(next != NULL);
+ memset(next,0, sizeof(subfiling_context_t));
+
if (io_concentrator == NULL) {
goto err_exit;
}
- if (next == NULL) {
- goto err_exit;
- }
else {
int k;
char *envValue = NULL;
@@ -445,6 +449,13 @@ H5FD__init_subfile_context(subfiling_context_t **newContext, int n_iocs, int wor
k++;
app_leader = k;
+ /* Do this now rather than having the ioc thread
+ * update the value
+ */
+ if (rank_is_ioc) {
+ sf_stripe_size = next->sf_stripe_size;
+ }
+
if (sf_verbose_flag && (world_rank == 0)) {
printf("app_leader = %d and ioc_leader = %d\n", app_leader, ioc_leader);
}
@@ -461,6 +472,9 @@ H5FD__init_subfile_context(subfiling_context_t **newContext, int n_iocs, int wor
* If so, then can probably initialize those things here!
*/
}
+ else {
+ next->sf_group_comm = MPI_COMM_NULL;
+ }
if (rank_is_ioc) {
status = initialize_ioc_threads(next);
@@ -470,6 +484,9 @@ H5FD__init_subfile_context(subfiling_context_t **newContext, int n_iocs, int wor
return 0;
err_exit:
+ if (next) {
+ free(next);
+ }
return -1;
}
@@ -634,23 +651,32 @@ static MPI_Datatype H5FD__create_f_l_mpi_type(subfiling_context_t *context,
int temp_disps[64];
int *blocks = temp_blocks;
int *disps = temp_disps;
+#if 0
+ /* Depth in bytes might be incorrect... How? */
+ if (total_bytes < target_write_bytes) {
+ int64_t remaining = target_write_bytes - total_bytes;
+ ioc_depth = (remaining / stripe_size) +1;
+ }
+#endif
if (ioc_depth > 64) {
blocks = (int *)calloc((size_t)ioc_depth, sizeof(int));
disps = (int *)calloc((size_t)ioc_depth, sizeof(int));
}
+
blocks[0] = (int)first_write;
disps[0] = 0;
- for(k=1; k < ioc_depth; k++) {
- total_bytes += stripe_size;
- blocks[k] = (int)stripe_size;
- disps[k] = (int)next_offset;
- next_offset += context->sf_blocksize_per_stripe;
- }
- blocks[k-1] = (int)last_write;
+
+ for(k=1; k < ioc_depth; k++) {
+ blocks[k] = (int)stripe_size;
+ disps[k] = (int)next_offset;
+ next_offset += context->sf_blocksize_per_stripe;
+ }
+ blocks[k-1] = (int)last_write;
+ if (ioc_depth > 2) total_bytes += (int64_t)((ioc_depth - 2) * stripe_size);
if (total_bytes != target_write_bytes) {
- printf("Warning (%s): total_SUM(%ld) != target_bytes(%ld)\n",
- __func__, total_bytes, target_write_bytes);
+ printf("[%d] Warning (%s): total_SUM(%ld) != target_bytes(%ld)\n",
+ sf_world_rank, __func__, total_bytes, target_write_bytes);
}
if (MPI_Type_indexed(ioc_depth, blocks, disps, MPI_BYTE, &newType) != MPI_SUCCESS) {
@@ -887,6 +913,7 @@ int sf_write_independent(hid_t context_id, int64_t offset, int64_t elements, int
static int *acks = NULL;
static int *indices = NULL;
static MPI_Request *reqs = NULL;
+ static MPI_Request *completed = NULL;
static MPI_Status *stats = NULL;
static int64_t *source_data_offset = NULL;
static int64_t *ioc_write_datasize = NULL;
@@ -895,6 +922,7 @@ int sf_write_independent(hid_t context_id, int64_t offset, int64_t elements, int
subfiling_context_t *sf_context = get_subfiling_object(context_id);
int i, target, ioc, n_waiting = 0, status = 0;
+ int awaiting_completion = 0;
int errors = 0;
if (acks == NULL) {
if ((acks = (int *)calloc((size_t)n_io_concentrators*2, sizeof(int))) == NULL) {
@@ -909,6 +937,12 @@ int sf_write_independent(hid_t context_id, int64_t offset, int64_t elements, int
return -1;
}
}
+ if (completed == NULL) {
+ if ((completed = (MPI_Request *)calloc((size_t)n_io_concentrators, sizeof(MPI_Request))) == NULL) {
+ perror("calloc");
+ return -1;
+ }
+ }
if (stats == NULL) {
if ((stats = (MPI_Status *)calloc((size_t)n_io_concentrators, sizeof(MPI_Status))) == NULL) {
perror("calloc");
@@ -1028,13 +1062,40 @@ int sf_write_independent(hid_t context_id, int64_t offset, int64_t elements, int
for(i=0; i < ready; i++) {
/* One of the Issend calls has completed
- * Wait for another ACK to indicate the data as been written
+ * Wait for another ACK to indicate that the data as been written
* to the subfile.
*/
+#if 0
acks[indices[i]] = 0;
+ MPI_Irecv(&acks[indices[i]], 1, MPI_INT, io_concentrator[indices[i]], COMPLETED, sf_context->sf_data_comm, &completed[indices[i]]);
+ awaiting_completion++;
+#endif
n_waiting--;
}
}
+ while(awaiting_completion) {
+ int ready = 0;
+ status = MPI_Waitsome(n_io_concentrators, completed, &ready, indices, stats);
+ if (status != MPI_SUCCESS) {
+ int len;
+ char estring[MPI_MAX_ERROR_STRING];
+ MPI_Error_string(status, estring, &len);
+ printf("[%d %s] MPI_ERROR! MPI_Waitsome returned an error(%s)\n",
+ sf_world_rank, __func__, estring );
+ fflush(stdout);
+ errors++;
+ }
+
+ for(i=0; i < ready; i++) {
+ /* One of the Issend calls has completed
+ * Wait for another ACK to indicate that the data as been written
+ * to the subfile.
+ */
+ acks[indices[i]] = 0;
+ awaiting_completion--;
+ }
+ }
+
if (errors) return -1;
return status;
}
@@ -1088,8 +1149,6 @@ int sf_open_subfiles(hid_t context_id, char *prefix, int flags)
int ioc_acks[n_io_concentrators];
MPI_Request reqs[n_io_concentrators];
subfiling_context_t *sf_context = get_subfiling_object(context_id);
-
- sf_stripe_size = sf_context->sf_stripe_size;
if ((sf_context->subfile_prefix != NULL) && (prefix != NULL)) {
if (strcmp(sf_context->subfile_prefix, prefix) != 0) {
@@ -1136,6 +1195,7 @@ ioc_main(subfiling_context_t *context)
int subfile_rank;
int flag, ret;
int max_work_depth;
+ int my_shutdown_flag = 0;
MPI_Status status, msg_status;
sf_work_request_t *incoming_requests = NULL;
useconds_t delay = 20;
@@ -1147,8 +1207,8 @@ ioc_main(subfiling_context_t *context)
assert(request_count_per_rank != NULL);
}
- max_work_depth = sf_world_size * MAX_WORK_PER_RANK;
- incoming_requests = (sf_work_request_t *)calloc((size_t)max_work_depth, sizeof(sf_work_request_t));
+ max_work_depth = MAX(8, sf_world_size * MAX_WORK_PER_RANK);
+ incoming_requests = (sf_work_request_t *)calloc((size_t)(max_work_depth +1), sizeof(sf_work_request_t));
assert(incoming_requests != NULL);
#ifdef DEBUG_TRACING
@@ -1185,7 +1245,6 @@ ioc_main(subfiling_context_t *context)
source, tag, context->sf_msg_comm, &msg_status);
}
if (ret == MPI_SUCCESS) {
-
#ifdef DEBUG_TRACING
printf("[[ioc(%d) msg from %d tag=%x, datasize=%ld, foffset=%ld]]\n", subfile_rank, source, tag,
incoming_requests[sf_workinprogress].header[0],
@@ -1199,22 +1258,36 @@ ioc_main(subfiling_context_t *context)
tpool_add_work(msg);
}
else {
+ int index = atomic_load(&sf_workinprogress);
incoming_requests[sf_workinprogress].tag = tag;
incoming_requests[sf_workinprogress].source = source;
incoming_requests[sf_workinprogress].subfile_rank = subfile_rank;
tpool_add_work(&incoming_requests[sf_workinprogress]);
- atomic_fetch_add(&sf_workinprogress, 1); // atomic
- atomic_compare_exchange_strong(&sf_workinprogress, &max_work_depth, 0);
+ if (index == max_work_depth -1) {
+ atomic_init(&sf_workinprogress, 0);
+ }
+ else {
+ atomic_fetch_add(&sf_workinprogress, 1); // atomic
+ }
}
}
}
- else usleep(delay);
+ else {
+ begin_thread_exclusive();
+ my_shutdown_flag = sf_shutdown_flag;
+ end_thread_exclusive();
+ usleep(delay);
+ }
}
#ifdef DEBUG_TRACING
fclose(sf_logfile);
#endif
+ if (incoming_requests) {
+ free(incoming_requests);
+ }
+
return 0;
}
@@ -1268,7 +1341,7 @@ int queue_write_indep(sf_work_request_t *msg, int subfile_rank, int source, MPI_
MPI_Status msg_status;
int64_t data_size = msg->header[0];
int64_t file_offset = msg->header[1];
-
+ int fd;
if (sf_verbose_flag) {
printf("[ioc(%d) %s]: msg from %d: datasize=%ld\toffset=%ld\n", subfile_rank, __func__, source, data_size, file_offset );
fflush(stdout);
@@ -1297,14 +1370,20 @@ int queue_write_indep(sf_work_request_t *msg, int subfile_rank, int source, MPI_
subfile_rank, __func__, data_size, source);
fflush(stdout);
}
- if (sf_write_data(subfile_fid, file_offset, recv_buffer, data_size, subfile_rank ) < 0) {
+
+ if ((fd = subfile_fid) < 0) {
+ printf("[ioc(%d)] WARNING: %s called while subfile_fid = %d (closed)\n", subfile_rank, __func__, subfile_fid);
+ fflush(stdout);
+ }
+ else if (sf_write_data(fd, file_offset, recv_buffer, data_size, subfile_rank ) < 0) {
free(recv_buffer);
recv_buffer = NULL;
printf("[ioc(%d) %s] sf_write_data returned an error!\n", subfile_rank, __func__);
fflush(stdout);
return -1;
}
- // send_ack__(source, WRITE_COMPLETED, tinfo->comm);
+ /* Done... */
+ // send_ack__(source, subfile_rank, COMPLETED, comm);
if (recv_buffer) {
free(recv_buffer);
}
@@ -1343,7 +1422,10 @@ int queue_read_indep(sf_work_request_t *msg, int subfile_rank, int source, MPI_C
return ret;
}
- if (send_buffer) free(send_buffer);
+ if (send_buffer) {
+ free(send_buffer);
+ send_buffer = NULL;
+ }
return 0;
}
@@ -1352,18 +1434,16 @@ int queue_read_indep(sf_work_request_t *msg, int subfile_rank, int source, MPI_C
int queue_file_open(sf_work_request_t *msg, int subfile_rank, int source, MPI_Comm comm)
{
int ret, req_count, errors=0;
- int prev_count;
+ int ref_count;
int flags = (int)(msg->header[0] & 0x0ffffffff);
-
atomic_fetch_add(&sf_file_refcount, 1); // atomic
-
+ ref_count = atomic_load(&sf_file_refcount);
if (sf_verbose_flag) {
printf("[ioc(%d) %s] file open flags = %0x, source=%d\n", subfile_rank, __func__, flags, source);
fflush(stdout);
}
- if (subfile_fid < 0) {
- errors = subfiling_open_file(sf_subfile_prefix, subfile_rank, flags);
- }
+
+ errors = subfiling_open_file(sf_subfile_prefix, subfile_rank, flags);
req_count = COMPLETED;
ret = MPI_Send(&req_count, 1, MPI_INT, source, COMPLETED, comm);
@@ -1391,9 +1471,12 @@ int decrement_file_ref_counts( int subfile_rank, int source, MPI_Comm comm, file
close_count = atomic_load(&sf_file_close_count);
open_count = atomic_load(&sf_file_refcount);
- if (close_count == open_count) {
+ if (close_count == sf_world_size) {
atomic_store(&sf_file_refcount, 0);
atomic_store(&sf_file_close_count, 0); /* Complete the reset to zeros */
+ while (!tpool_is_empty) {
+ usleep(10);
+ }
if (callback_ftn(subfile_rank, comm) < 0) {
printf("[ioc(%d) %s] callback_ftn returned an error\n", subfile_rank, __func__ );
fflush(stdout);
@@ -1412,9 +1495,21 @@ int subfiling_close_file(int subfile_rank, MPI_Comm comm)
{
int ret, source = 0;
int errors = 0, flag = COMPLETED;
+
+#if 0
+ printf("[ioc(%d) %s] subfile_fid = %d\n", subfile_rank, __func__, subfile_fid);
+ fflush(stdout);
+#endif
if (subfile_fid >= 0) {
- close(subfile_fid);
+
+#if 0
+ if (close(subfile_fid) < 0) {
+ perror("subfiling_close_file");
+ }
subfile_fid = -1;
+#else
+ fdatasync(subfile_fid);
+#endif
}
/* Notify all ranks */
for (source = 0; source < sf_world_size; source++) {
@@ -1443,100 +1538,105 @@ int subfiling_close_file(int subfile_rank, MPI_Comm comm)
int subfiling_open_file(const char *prefix, int subfile_rank, int flags)
{
int errors = 0;
- /* Only the real IOCs open the subfiles */
+ /* Only the real IOCs open the subfiles
+ * Once a file is opened, all additional file open requests
+ * can return immediately.
+ */
if (subfile_rank >= 0) {
- const char *dotconfig = ".subfile_config";
- mode_t mode = S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH;
- char filepath[PATH_MAX];
- char config[PATH_MAX];
- if (prefix) {
- mkdir(prefix, S_IRWXU);
- sprintf(filepath, "%s/node_local_temp_%d_of_%d",
- prefix, subfile_rank, n_io_concentrators);
- sprintf(config, "%s/%s", prefix, dotconfig);
- }
- else {
- sprintf(filepath, "node_local_temp_%d_of_%d",
- subfile_rank,n_io_concentrators);
- strcpy(config, dotconfig);
- }
-
- if ((subfile_fid = open(filepath, flags, mode)) < 0) {
- perror("subfile open");
- errors++;
- goto done;
- }
- else {
- /*
- * File open and close operations are collective
- * in intent. Apart from actual IO operations, we
- * initialize the number of references to everyone
- * so that we detect when pending IO operations are
- * have all completed before we close the actual
- * subfiles.
- */
- atomic_init((atomic_int *)&sf_workinprogress, sf_world_size);
- }
+ char filepath[PATH_MAX];
+ char config[PATH_MAX];
+
+
+ if (subfile_fid < 0) {
+ const char *dotconfig = ".subfile_config";
+ mode_t mode = S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH;
+ if (prefix) {
+ mkdir(prefix, S_IRWXU);
+ sprintf(filepath, "%s/node_local_temp_%d_of_%d",
+ prefix, subfile_rank, n_io_concentrators);
+ sprintf(config, "%s/%s", prefix, dotconfig);
+ }
+ else {
+ sprintf(filepath, "node_local_temp_%d_of_%d",
+ subfile_rank,n_io_concentrators);
+ strcpy(config, dotconfig);
+ }
- if ((subfile_rank == 0) && (flags & O_CREAT)) {
- size_t bufsize = PATH_MAX + 16;
- FILE *f = NULL;
- char linebuf[bufsize];
- /* If a config file already exists, AND
- * the user wants to truncate subfiles (if they exist),
- * then we should also truncate an existing config file.
- */
- if (access(config, flags) == 0) {
- truncate(config, 0);
- }
- f = fopen(config, "w+");
- if (f != NULL) {
- int k;
- char *underscore = strrchr(filepath,'_');
- *underscore=0;
- strcpy(config, filepath);
- *underscore='_';
- sprintf(linebuf,"stripe_size=%ld\n", sf_stripe_size);
- fwrite(linebuf, strlen(linebuf), 1, f);
- sprintf(linebuf,"aggregator_count=%d\n",n_io_concentrators);
- fwrite(linebuf, strlen(linebuf), 1, f);
-
- for(k=0; k < n_io_concentrators; k++) {
- snprintf(linebuf,bufsize,"%s_%d:%d\n",config, k, io_concentrator[k]);
- fwrite(linebuf, strlen(linebuf), 1, f);
- }
+ begin_thread_exclusive();
- fclose(f);
- }
- else {
- perror("fopen(config)");
+ if ((subfile_fid = open(filepath, flags, mode)) < 0) {
+ perror("subfile open");
+ end_thread_exclusive();
errors++;
goto done;
}
- }
- if (sf_verbose_flag) {
- printf("[ioc:%d] Opened subfile %s\n", subfile_rank, filepath);
- }
+
+ end_thread_exclusive();
+
+ if (flags & O_CREAT) {
+ size_t bufsize = PATH_MAX + 16;
+ FILE *f = NULL;
+ char linebuf[bufsize];
+ /* If a config file already exists, AND
+ * the user wants to truncate subfiles (if they exist),
+ * then we should also truncate an existing config file.
+ */
+ if (access(config, flags) == 0) {
+ truncate(config, 0);
+ }
+ f = fopen(config, "w+");
+ if (f != NULL) {
+ int k;
+ char *underscore = strrchr(filepath,'_');
+ *underscore=0;
+ strcpy(config, filepath);
+ *underscore='_';
+ sprintf(linebuf,"stripe_size=%ld\n", sf_stripe_size);
+ fwrite(linebuf, strlen(linebuf), 1, f);
+ sprintf(linebuf,"aggregator_count=%d\n",n_io_concentrators);
+ fwrite(linebuf, strlen(linebuf), 1, f);
+
+ for(k=0; k < n_io_concentrators; k++) {
+ snprintf(linebuf,bufsize,"%s_%d:%d\n",config, k, io_concentrator[k]);
+ fwrite(linebuf, strlen(linebuf), 1, f);
+ }
+
+ fclose(f);
+ }
+ else {
+ perror("fopen(config)");
+ errors++;
+ goto done;
+ }
+ }
+ if (sf_verbose_flag) {
+ printf("[ioc:%d] Opened subfile %s\n", subfile_rank, filepath);
+ }
+ }
+
}
done:
return errors;
}
+
+
void
delete_subfiling_context(hid_t context_id)
{
subfiling_context_t *sf_context = get_subfiling_object(context_id);
- MPI_Comm_free(&sf_context->sf_msg_comm);
- MPI_Comm_free(&sf_context->sf_data_comm);
- sf_msg_comm = MPI_COMM_NULL;
- sf_data_comm = MPI_COMM_NULL;
- if (n_io_concentrators > 1) {
- MPI_Comm_free(&sf_context->sf_group_comm);
- MPI_Comm_free(&sf_context->sf_intercomm);
+ if (sf_context) {
+ MPI_Comm_free(&sf_context->sf_msg_comm);
+ MPI_Comm_free(&sf_context->sf_data_comm);
+ sf_msg_comm = MPI_COMM_NULL;
+ sf_data_comm = MPI_COMM_NULL;
+ if (n_io_concentrators > 1) {
+ MPI_Comm_free(&sf_context->sf_group_comm);
+ MPI_Comm_free(&sf_context->sf_intercomm);
+ }
+ free(sf_context);
}
-
- free(sf_context);
- usleep(100);
+ usleep(100);
return;
}
diff --git a/src/H5FDsubfile_private.h b/src/H5FDsubfile_private.h
index 0774d7f..0088c13 100644
--- a/src/H5FDsubfile_private.h
+++ b/src/H5FDsubfile_private.h
@@ -73,7 +73,7 @@ typedef struct {
} sf_topology_t;
#define K(n) ((n)*1024)
-#define DEFAULT_STRIPE_SIZE K(4) /* (1024*1024) */
+#define DEFAULT_STRIPE_SIZE K(256) /* (1024*1024) */
#define MAX_DEPTH 256
typedef enum io_ops {
@@ -121,6 +121,7 @@ typedef enum {
#define ACK_PART (0x0acc << 8)
#define DATA_PART (0xd8da << 8)
+#define READY (0xfeed << 8)
#define COMPLETED (0xfed1 << 8)
#define READ_INDEP (READ_OP)
@@ -165,6 +166,7 @@ H5_DLL void * get_subfiling_object(int64_t object_id);
H5_DLL hid_t get_subfiling_context(void);
H5_DLL int initialize_ioc_threads(subfiling_context_t *sf_context);
H5_DLL int tpool_add_work(sf_work_request_t *);
+H5_DLL bool tpool_is_empty(void);
H5_DLL int ioc_main(subfiling_context_t *context);
H5_DLL int queue_write_coll(sf_work_request_t *msg, int subfile_rank, int source, MPI_Comm comm);
H5_DLL int queue_read_coll(sf_work_request_t *msg, int subfile_rank, int source, MPI_Comm comm);
@@ -182,5 +184,9 @@ H5_DLL int sf_write_independent(hid_t context_id, int64_t offset, int64_t elemen
H5_DLL int sf_read_data(int fd, int64_t file_offset, void *data_buffer, int64_t data_size, int subfile_rank);
H5_DLL void delete_subfiling_context(hid_t context_id);
H5_DLL void finalize_ioc_threads(void);
+H5_DLL int begin_thread_exclusive(void);
+H5_DLL int end_thread_exclusive(void);
+H5_DLL int wait_for_thread_main(void);
+H5_DLL int finalize_subfile_close(void);
#endif
diff --git a/src/H5FDsubfile_threads.c b/src/H5FDsubfile_threads.c
index 9c54a53..fb99930 100644
--- a/src/H5FDsubfile_threads.c
+++ b/src/H5FDsubfile_threads.c
@@ -13,10 +13,13 @@
#include "mercury/mercury_thread_spin.c"
static hg_thread_mutex_t ioc_mutex = PTHREAD_MUTEX_INITIALIZER;
+static hg_thread_mutex_t ioc_thread_mutex = PTHREAD_MUTEX_INITIALIZER;
static hg_thread_pool_t *ioc_thread_pool = NULL;
static hg_thread_t ioc_thread;
+#ifndef HG_TEST_NUM_THREADS_DEFAULT
#define HG_TEST_NUM_THREADS_DEFAULT 4
+#endif
#define POOL_CONCURRENT_MAX 64
static struct hg_thread_work pool_request[POOL_CONCURRENT_MAX];
@@ -42,6 +45,12 @@ initialize_ioc_threads(subfiling_context_t *sf_context)
puts("hg_thread_mutex_init failed");
goto err_exit;
}
+ status = hg_thread_mutex_init(&ioc_thread_mutex);
+ if (status) {
+ puts("hg_thread_mutex_init failed");
+ goto err_exit;
+ }
+
status = hg_thread_pool_init(HG_TEST_NUM_THREADS_DEFAULT, &ioc_thread_pool);
if (status) {
puts("hg_thread_pool_init failed");
@@ -64,10 +73,6 @@ void __attribute__((destructor)) finalize_ioc_threads(void)
if (ioc_thread_pool != NULL) {
hg_thread_pool_destroy(ioc_thread_pool);
ioc_thread_pool = NULL;
-
- if (hg_thread_join(ioc_thread) == 0)
- puts("thread_join succeeded");
- else puts("thread_join failed");
}
}
@@ -94,10 +99,8 @@ handle_work_request(void *arg)
status = queue_read_indep( msg, msg->subfile_rank, msg->source, sf_data_comm);
break;
case CLOSE_OP:
- hg_thread_mutex_lock(&ioc_mutex);
status = decrement_file_ref_counts( msg->subfile_rank, msg->source, sf_data_comm,
subfiling_close_file);
- hg_thread_mutex_unlock(&ioc_mutex);
break;
case OPEN_OP:
status = queue_file_open( msg, msg->subfile_rank, msg->source, sf_data_comm);
@@ -130,3 +133,29 @@ int tpool_add_work(sf_work_request_t *work)
hg_thread_mutex_unlock(&ioc_mutex);
return 0;
}
+
+bool tpool_is_empty(void)
+{
+ return HG_QUEUE_IS_EMPTY(&ioc_thread_pool->queue);
+}
+
+int begin_thread_exclusive(void)
+{
+ return hg_thread_mutex_lock(&ioc_thread_mutex);
+}
+
+int end_thread_exclusive(void)
+{
+ return hg_thread_mutex_unlock(&ioc_thread_mutex);
+}
+
+int wait_for_thread_main(void)
+{
+ if (hg_thread_join(ioc_thread) == 0)
+ puts("thread_join succeeded");
+ else {
+ puts("thread_join failed");
+ return -1;
+ }
+ return 0;
+}
diff --git a/testpar/CMakeLists.txt b/testpar/CMakeLists.txt
index 3d0fe7e..c95e01f 100644
--- a/testpar/CMakeLists.txt
+++ b/testpar/CMakeLists.txt
@@ -76,6 +76,7 @@ set (H5P_TESTS
t_filters_parallel
t_2Gio
t_subfile_openclose
+ t_subfile_readwrite
)
foreach (h5_testp ${H5P_TESTS})
diff --git a/testpar/Makefile.am b/testpar/Makefile.am
index 9f8eca9..2914cb1 100644
--- a/testpar/Makefile.am
+++ b/testpar/Makefile.am
@@ -31,7 +31,7 @@ check_SCRIPTS = $(TEST_SCRIPT_PARA)
# Test programs. These are our main targets.
#
TEST_PROG_PARA=t_mpi t_bigio testphdf5 t_cache t_cache_image t_pread t_pshutdown t_prestart t_init_term t_shapesame t_filters_parallel t_2Gio \
- t_subfile_openclose
+ t_subfile_openclose t_subfile_readwrite
# t_pflush1 and t_pflush2 are used by testpflush.sh
check_PROGRAMS = $(TEST_PROG_PARA) t_pflush1 t_pflush2
diff --git a/testpar/t_subfile_openclose.c b/testpar/t_subfile_openclose.c
index 9f31b6e..fe39f2c 100644
--- a/testpar/t_subfile_openclose.c
+++ b/testpar/t_subfile_openclose.c
@@ -9,13 +9,21 @@ main(int argc, char **argv)
{
int i, mpi_size, mpi_rank;
+ int loop_count = 20;
int mpi_provides, require = MPI_THREAD_MULTIPLE;
- hid_t subfile_id;
+ hid_t subfile_id = -1;
MPI_Init_thread(&argc, &argv, require, &mpi_provides);
MPI_Comm_size(MPI_COMM_WORLD, &mpi_size);
MPI_Comm_rank(MPI_COMM_WORLD, &mpi_rank);
+ if (argc > 1) {
+ int check_value = atoi(argv[1]);
+ if (check_value > 0) {
+ loop_count = check_value;
+ }
+ }
+
H5open();
if (H5FDsubfiling_init() == SUCCEED) {
@@ -26,15 +34,15 @@ main(int argc, char **argv)
puts("Error: Unable to initialize subfiling!");
}
- for(i=0; i < 10; i++) {
+ for(i=0; i < loop_count; i++) {
sf_open_subfiles(subfile_id, NULL, O_CREAT|O_TRUNC|O_RDWR);
sf_close_subfiles(subfile_id);
}
- MPI_Barrier(MPI_COMM_WORLD);
-
H5FDsubfiling_finalize();
+ MPI_Barrier(MPI_COMM_WORLD);
+
H5close();
MPI_Finalize();
diff --git a/testpar/t_subfile_readwrite.c b/testpar/t_subfile_readwrite.c
index 24d40e3..b4c798a 100644
--- a/testpar/t_subfile_readwrite.c
+++ b/testpar/t_subfile_readwrite.c
@@ -158,6 +158,10 @@ done:
free(local_data);
local_data = NULL;
}
+ if (verify_data) {
+ free(verify_data);
+ verify_data = NULL;
+ }
H5close();