From 8b7167488091054e77f2b5ece385963f3cd1f923 Mon Sep 17 00:00:00 2001 From: Richard Warren Date: Wed, 1 Jul 2020 12:15:16 -0400 Subject: Various bug fixes for the multi-threaded IO concentrator(s) --- src/H5FDsubfile.c | 21 ++- src/H5FDsubfile_mpi.c | 340 +++++++++++++++++++++++++++--------------- src/H5FDsubfile_private.h | 8 +- src/H5FDsubfile_threads.c | 41 ++++- testpar/CMakeLists.txt | 1 + testpar/Makefile.am | 2 +- testpar/t_subfile_openclose.c | 16 +- testpar/t_subfile_readwrite.c | 4 + 8 files changed, 298 insertions(+), 135 deletions(-) diff --git a/src/H5FDsubfile.c b/src/H5FDsubfile.c index accded7..a467533 100644 --- a/src/H5FDsubfile.c +++ b/src/H5FDsubfile.c @@ -127,11 +127,11 @@ int sf_write_data(int fd, int64_t file_offset, void *data_buffer, int64_t data_s char *this_data = (char *)data_buffer; ssize_t bytes_remaining = (ssize_t) data_size; ssize_t written = 0; - while(bytes_remaining) { if ((written = pwrite(fd, this_data, (size_t)bytes_remaining, file_offset)) < 0) { perror("pwrite failed!"); fflush(stdout); + break; } else { if (sf_verbose_flag) { @@ -234,14 +234,29 @@ herr_t H5FDsubfiling_finalize(void) { herr_t ret_value = SUCCEED; /* Return value */ + sf_topology_t *thisApp = NULL; FUNC_ENTER_API(FAIL) H5TRACE0("e",""); /* Shutdown the IO Concentrator threads */ - sf_shutdown_flag = 1; - usleep(100); + + if (topology_id != H5I_INVALID_HID) { + thisApp = get_subfiling_object(topology_id); + } + + if (thisApp && thisApp->rank_is_ioc) { + begin_thread_exclusive(); + sf_shutdown_flag = 1; + end_thread_exclusive(); + + usleep(100); + + wait_for_thread_main(); + } + MPI_Barrier(MPI_COMM_WORLD); + delete_subfiling_context(context_id); FUNC_LEAVE_API(ret_value) diff --git a/src/H5FDsubfile_mpi.c b/src/H5FDsubfile_mpi.c index c66c6b7..fda4928 100644 --- a/src/H5FDsubfile_mpi.c +++ b/src/H5FDsubfile_mpi.c @@ -262,7 +262,7 @@ static void gather_topology_info(sf_topology_t *info) if (info->topology) return; - if (sf_world_size > 1) { + if (sf_world_size) { long hostid = gethostid(); layout_t my_hostinfo; layout_t *topology = (layout_t *)calloc((size_t)sf_world_size+1, sizeof(layout_t)); @@ -275,10 +275,12 @@ static void gather_topology_info(sf_topology_t *info) my_hostinfo.rank = sf_world_rank; my_hostinfo.hostid = hostid; info->topology[sf_world_rank] = my_hostinfo; - if (MPI_Allgather(&my_hostinfo, 2, MPI_LONG, - info->topology, 2, MPI_LONG, - MPI_COMM_WORLD) == MPI_SUCCESS) { - qsort(info->topology, (size_t)sf_world_size, sizeof(layout_t), compare_hostid); + if (sf_world_size > 1) { + if (MPI_Allgather(&my_hostinfo, 2, MPI_LONG, + info->topology, 2, MPI_LONG, + MPI_COMM_WORLD) == MPI_SUCCESS) { + qsort(info->topology, (size_t)sf_world_size, sizeof(layout_t), compare_hostid); + } } } } @@ -294,6 +296,8 @@ static int count_nodes(sf_topology_t *info) nextid = info->topology[0].hostid; info->node_ranks = (int *)calloc((size_t)(info->world_size+1), sizeof(int)); + assert(info->node_ranks != NULL); + if (nextid == info->hostid) hostid_index = 0; @@ -326,12 +330,13 @@ H5FD__determine_ioc_count(int world_size, int world_rank, sf_topology_t **thisap int node_index; int iocs_per_node = 1; char *envValue = NULL; - sf_topology_t *app_topology = (sf_topology_t *)calloc(1, sizeof(sf_topology_t)); + sf_topology_t *app_topology = (sf_topology_t *)malloc(sizeof(sf_topology_t)); assert(app_topology != NULL); + memset(app_topology, 0, sizeof(sf_topology_t)); app_topology->world_size = world_size; app_topology->world_rank = world_rank; - io_concentrator = (int *)calloc((size_t)world_size, sizeof(int)); + io_concentrator = (int *)malloc(((size_t)world_size * sizeof(int))); assert(io_concentrator != NULL); ioc_count = count_nodes (app_topology); /* FIXME: This should ONLY be used for testing! @@ -402,14 +407,13 @@ int H5FD__init_subfile_context(subfiling_context_t **newContext, int n_iocs, int world_size, int world_rank, bool rank_is_ioc) { int status; - subfiling_context_t *next = (subfiling_context_t *) - malloc(sizeof(subfiling_context_t)); + subfiling_context_t *next = (subfiling_context_t *) malloc(sizeof(subfiling_context_t)); + assert(next != NULL); + memset(next,0, sizeof(subfiling_context_t)); + if (io_concentrator == NULL) { goto err_exit; } - if (next == NULL) { - goto err_exit; - } else { int k; char *envValue = NULL; @@ -445,6 +449,13 @@ H5FD__init_subfile_context(subfiling_context_t **newContext, int n_iocs, int wor k++; app_leader = k; + /* Do this now rather than having the ioc thread + * update the value + */ + if (rank_is_ioc) { + sf_stripe_size = next->sf_stripe_size; + } + if (sf_verbose_flag && (world_rank == 0)) { printf("app_leader = %d and ioc_leader = %d\n", app_leader, ioc_leader); } @@ -461,6 +472,9 @@ H5FD__init_subfile_context(subfiling_context_t **newContext, int n_iocs, int wor * If so, then can probably initialize those things here! */ } + else { + next->sf_group_comm = MPI_COMM_NULL; + } if (rank_is_ioc) { status = initialize_ioc_threads(next); @@ -470,6 +484,9 @@ H5FD__init_subfile_context(subfiling_context_t **newContext, int n_iocs, int wor return 0; err_exit: + if (next) { + free(next); + } return -1; } @@ -634,23 +651,32 @@ static MPI_Datatype H5FD__create_f_l_mpi_type(subfiling_context_t *context, int temp_disps[64]; int *blocks = temp_blocks; int *disps = temp_disps; +#if 0 + /* Depth in bytes might be incorrect... How? */ + if (total_bytes < target_write_bytes) { + int64_t remaining = target_write_bytes - total_bytes; + ioc_depth = (remaining / stripe_size) +1; + } +#endif if (ioc_depth > 64) { blocks = (int *)calloc((size_t)ioc_depth, sizeof(int)); disps = (int *)calloc((size_t)ioc_depth, sizeof(int)); } + blocks[0] = (int)first_write; disps[0] = 0; - for(k=1; k < ioc_depth; k++) { - total_bytes += stripe_size; - blocks[k] = (int)stripe_size; - disps[k] = (int)next_offset; - next_offset += context->sf_blocksize_per_stripe; - } - blocks[k-1] = (int)last_write; + + for(k=1; k < ioc_depth; k++) { + blocks[k] = (int)stripe_size; + disps[k] = (int)next_offset; + next_offset += context->sf_blocksize_per_stripe; + } + blocks[k-1] = (int)last_write; + if (ioc_depth > 2) total_bytes += (int64_t)((ioc_depth - 2) * stripe_size); if (total_bytes != target_write_bytes) { - printf("Warning (%s): total_SUM(%ld) != target_bytes(%ld)\n", - __func__, total_bytes, target_write_bytes); + printf("[%d] Warning (%s): total_SUM(%ld) != target_bytes(%ld)\n", + sf_world_rank, __func__, total_bytes, target_write_bytes); } if (MPI_Type_indexed(ioc_depth, blocks, disps, MPI_BYTE, &newType) != MPI_SUCCESS) { @@ -887,6 +913,7 @@ int sf_write_independent(hid_t context_id, int64_t offset, int64_t elements, int static int *acks = NULL; static int *indices = NULL; static MPI_Request *reqs = NULL; + static MPI_Request *completed = NULL; static MPI_Status *stats = NULL; static int64_t *source_data_offset = NULL; static int64_t *ioc_write_datasize = NULL; @@ -895,6 +922,7 @@ int sf_write_independent(hid_t context_id, int64_t offset, int64_t elements, int subfiling_context_t *sf_context = get_subfiling_object(context_id); int i, target, ioc, n_waiting = 0, status = 0; + int awaiting_completion = 0; int errors = 0; if (acks == NULL) { if ((acks = (int *)calloc((size_t)n_io_concentrators*2, sizeof(int))) == NULL) { @@ -909,6 +937,12 @@ int sf_write_independent(hid_t context_id, int64_t offset, int64_t elements, int return -1; } } + if (completed == NULL) { + if ((completed = (MPI_Request *)calloc((size_t)n_io_concentrators, sizeof(MPI_Request))) == NULL) { + perror("calloc"); + return -1; + } + } if (stats == NULL) { if ((stats = (MPI_Status *)calloc((size_t)n_io_concentrators, sizeof(MPI_Status))) == NULL) { perror("calloc"); @@ -1028,13 +1062,40 @@ int sf_write_independent(hid_t context_id, int64_t offset, int64_t elements, int for(i=0; i < ready; i++) { /* One of the Issend calls has completed - * Wait for another ACK to indicate the data as been written + * Wait for another ACK to indicate that the data as been written * to the subfile. */ +#if 0 acks[indices[i]] = 0; + MPI_Irecv(&acks[indices[i]], 1, MPI_INT, io_concentrator[indices[i]], COMPLETED, sf_context->sf_data_comm, &completed[indices[i]]); + awaiting_completion++; +#endif n_waiting--; } } + while(awaiting_completion) { + int ready = 0; + status = MPI_Waitsome(n_io_concentrators, completed, &ready, indices, stats); + if (status != MPI_SUCCESS) { + int len; + char estring[MPI_MAX_ERROR_STRING]; + MPI_Error_string(status, estring, &len); + printf("[%d %s] MPI_ERROR! MPI_Waitsome returned an error(%s)\n", + sf_world_rank, __func__, estring ); + fflush(stdout); + errors++; + } + + for(i=0; i < ready; i++) { + /* One of the Issend calls has completed + * Wait for another ACK to indicate that the data as been written + * to the subfile. + */ + acks[indices[i]] = 0; + awaiting_completion--; + } + } + if (errors) return -1; return status; } @@ -1088,8 +1149,6 @@ int sf_open_subfiles(hid_t context_id, char *prefix, int flags) int ioc_acks[n_io_concentrators]; MPI_Request reqs[n_io_concentrators]; subfiling_context_t *sf_context = get_subfiling_object(context_id); - - sf_stripe_size = sf_context->sf_stripe_size; if ((sf_context->subfile_prefix != NULL) && (prefix != NULL)) { if (strcmp(sf_context->subfile_prefix, prefix) != 0) { @@ -1136,6 +1195,7 @@ ioc_main(subfiling_context_t *context) int subfile_rank; int flag, ret; int max_work_depth; + int my_shutdown_flag = 0; MPI_Status status, msg_status; sf_work_request_t *incoming_requests = NULL; useconds_t delay = 20; @@ -1147,8 +1207,8 @@ ioc_main(subfiling_context_t *context) assert(request_count_per_rank != NULL); } - max_work_depth = sf_world_size * MAX_WORK_PER_RANK; - incoming_requests = (sf_work_request_t *)calloc((size_t)max_work_depth, sizeof(sf_work_request_t)); + max_work_depth = MAX(8, sf_world_size * MAX_WORK_PER_RANK); + incoming_requests = (sf_work_request_t *)calloc((size_t)(max_work_depth +1), sizeof(sf_work_request_t)); assert(incoming_requests != NULL); #ifdef DEBUG_TRACING @@ -1185,7 +1245,6 @@ ioc_main(subfiling_context_t *context) source, tag, context->sf_msg_comm, &msg_status); } if (ret == MPI_SUCCESS) { - #ifdef DEBUG_TRACING printf("[[ioc(%d) msg from %d tag=%x, datasize=%ld, foffset=%ld]]\n", subfile_rank, source, tag, incoming_requests[sf_workinprogress].header[0], @@ -1199,22 +1258,36 @@ ioc_main(subfiling_context_t *context) tpool_add_work(msg); } else { + int index = atomic_load(&sf_workinprogress); incoming_requests[sf_workinprogress].tag = tag; incoming_requests[sf_workinprogress].source = source; incoming_requests[sf_workinprogress].subfile_rank = subfile_rank; tpool_add_work(&incoming_requests[sf_workinprogress]); - atomic_fetch_add(&sf_workinprogress, 1); // atomic - atomic_compare_exchange_strong(&sf_workinprogress, &max_work_depth, 0); + if (index == max_work_depth -1) { + atomic_init(&sf_workinprogress, 0); + } + else { + atomic_fetch_add(&sf_workinprogress, 1); // atomic + } } } } - else usleep(delay); + else { + begin_thread_exclusive(); + my_shutdown_flag = sf_shutdown_flag; + end_thread_exclusive(); + usleep(delay); + } } #ifdef DEBUG_TRACING fclose(sf_logfile); #endif + if (incoming_requests) { + free(incoming_requests); + } + return 0; } @@ -1268,7 +1341,7 @@ int queue_write_indep(sf_work_request_t *msg, int subfile_rank, int source, MPI_ MPI_Status msg_status; int64_t data_size = msg->header[0]; int64_t file_offset = msg->header[1]; - + int fd; if (sf_verbose_flag) { printf("[ioc(%d) %s]: msg from %d: datasize=%ld\toffset=%ld\n", subfile_rank, __func__, source, data_size, file_offset ); fflush(stdout); @@ -1297,14 +1370,20 @@ int queue_write_indep(sf_work_request_t *msg, int subfile_rank, int source, MPI_ subfile_rank, __func__, data_size, source); fflush(stdout); } - if (sf_write_data(subfile_fid, file_offset, recv_buffer, data_size, subfile_rank ) < 0) { + + if ((fd = subfile_fid) < 0) { + printf("[ioc(%d)] WARNING: %s called while subfile_fid = %d (closed)\n", subfile_rank, __func__, subfile_fid); + fflush(stdout); + } + else if (sf_write_data(fd, file_offset, recv_buffer, data_size, subfile_rank ) < 0) { free(recv_buffer); recv_buffer = NULL; printf("[ioc(%d) %s] sf_write_data returned an error!\n", subfile_rank, __func__); fflush(stdout); return -1; } - // send_ack__(source, WRITE_COMPLETED, tinfo->comm); + /* Done... */ + // send_ack__(source, subfile_rank, COMPLETED, comm); if (recv_buffer) { free(recv_buffer); } @@ -1343,7 +1422,10 @@ int queue_read_indep(sf_work_request_t *msg, int subfile_rank, int source, MPI_C return ret; } - if (send_buffer) free(send_buffer); + if (send_buffer) { + free(send_buffer); + send_buffer = NULL; + } return 0; } @@ -1352,18 +1434,16 @@ int queue_read_indep(sf_work_request_t *msg, int subfile_rank, int source, MPI_C int queue_file_open(sf_work_request_t *msg, int subfile_rank, int source, MPI_Comm comm) { int ret, req_count, errors=0; - int prev_count; + int ref_count; int flags = (int)(msg->header[0] & 0x0ffffffff); - atomic_fetch_add(&sf_file_refcount, 1); // atomic - + ref_count = atomic_load(&sf_file_refcount); if (sf_verbose_flag) { printf("[ioc(%d) %s] file open flags = %0x, source=%d\n", subfile_rank, __func__, flags, source); fflush(stdout); } - if (subfile_fid < 0) { - errors = subfiling_open_file(sf_subfile_prefix, subfile_rank, flags); - } + + errors = subfiling_open_file(sf_subfile_prefix, subfile_rank, flags); req_count = COMPLETED; ret = MPI_Send(&req_count, 1, MPI_INT, source, COMPLETED, comm); @@ -1391,9 +1471,12 @@ int decrement_file_ref_counts( int subfile_rank, int source, MPI_Comm comm, file close_count = atomic_load(&sf_file_close_count); open_count = atomic_load(&sf_file_refcount); - if (close_count == open_count) { + if (close_count == sf_world_size) { atomic_store(&sf_file_refcount, 0); atomic_store(&sf_file_close_count, 0); /* Complete the reset to zeros */ + while (!tpool_is_empty) { + usleep(10); + } if (callback_ftn(subfile_rank, comm) < 0) { printf("[ioc(%d) %s] callback_ftn returned an error\n", subfile_rank, __func__ ); fflush(stdout); @@ -1412,9 +1495,21 @@ int subfiling_close_file(int subfile_rank, MPI_Comm comm) { int ret, source = 0; int errors = 0, flag = COMPLETED; + +#if 0 + printf("[ioc(%d) %s] subfile_fid = %d\n", subfile_rank, __func__, subfile_fid); + fflush(stdout); +#endif if (subfile_fid >= 0) { - close(subfile_fid); + +#if 0 + if (close(subfile_fid) < 0) { + perror("subfiling_close_file"); + } subfile_fid = -1; +#else + fdatasync(subfile_fid); +#endif } /* Notify all ranks */ for (source = 0; source < sf_world_size; source++) { @@ -1443,100 +1538,105 @@ int subfiling_close_file(int subfile_rank, MPI_Comm comm) int subfiling_open_file(const char *prefix, int subfile_rank, int flags) { int errors = 0; - /* Only the real IOCs open the subfiles */ + /* Only the real IOCs open the subfiles + * Once a file is opened, all additional file open requests + * can return immediately. + */ if (subfile_rank >= 0) { - const char *dotconfig = ".subfile_config"; - mode_t mode = S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH; - char filepath[PATH_MAX]; - char config[PATH_MAX]; - if (prefix) { - mkdir(prefix, S_IRWXU); - sprintf(filepath, "%s/node_local_temp_%d_of_%d", - prefix, subfile_rank, n_io_concentrators); - sprintf(config, "%s/%s", prefix, dotconfig); - } - else { - sprintf(filepath, "node_local_temp_%d_of_%d", - subfile_rank,n_io_concentrators); - strcpy(config, dotconfig); - } - - if ((subfile_fid = open(filepath, flags, mode)) < 0) { - perror("subfile open"); - errors++; - goto done; - } - else { - /* - * File open and close operations are collective - * in intent. Apart from actual IO operations, we - * initialize the number of references to everyone - * so that we detect when pending IO operations are - * have all completed before we close the actual - * subfiles. - */ - atomic_init((atomic_int *)&sf_workinprogress, sf_world_size); - } + char filepath[PATH_MAX]; + char config[PATH_MAX]; + + + if (subfile_fid < 0) { + const char *dotconfig = ".subfile_config"; + mode_t mode = S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH; + if (prefix) { + mkdir(prefix, S_IRWXU); + sprintf(filepath, "%s/node_local_temp_%d_of_%d", + prefix, subfile_rank, n_io_concentrators); + sprintf(config, "%s/%s", prefix, dotconfig); + } + else { + sprintf(filepath, "node_local_temp_%d_of_%d", + subfile_rank,n_io_concentrators); + strcpy(config, dotconfig); + } - if ((subfile_rank == 0) && (flags & O_CREAT)) { - size_t bufsize = PATH_MAX + 16; - FILE *f = NULL; - char linebuf[bufsize]; - /* If a config file already exists, AND - * the user wants to truncate subfiles (if they exist), - * then we should also truncate an existing config file. - */ - if (access(config, flags) == 0) { - truncate(config, 0); - } - f = fopen(config, "w+"); - if (f != NULL) { - int k; - char *underscore = strrchr(filepath,'_'); - *underscore=0; - strcpy(config, filepath); - *underscore='_'; - sprintf(linebuf,"stripe_size=%ld\n", sf_stripe_size); - fwrite(linebuf, strlen(linebuf), 1, f); - sprintf(linebuf,"aggregator_count=%d\n",n_io_concentrators); - fwrite(linebuf, strlen(linebuf), 1, f); - - for(k=0; k < n_io_concentrators; k++) { - snprintf(linebuf,bufsize,"%s_%d:%d\n",config, k, io_concentrator[k]); - fwrite(linebuf, strlen(linebuf), 1, f); - } + begin_thread_exclusive(); - fclose(f); - } - else { - perror("fopen(config)"); + if ((subfile_fid = open(filepath, flags, mode)) < 0) { + perror("subfile open"); + end_thread_exclusive(); errors++; goto done; } - } - if (sf_verbose_flag) { - printf("[ioc:%d] Opened subfile %s\n", subfile_rank, filepath); - } + + end_thread_exclusive(); + + if (flags & O_CREAT) { + size_t bufsize = PATH_MAX + 16; + FILE *f = NULL; + char linebuf[bufsize]; + /* If a config file already exists, AND + * the user wants to truncate subfiles (if they exist), + * then we should also truncate an existing config file. + */ + if (access(config, flags) == 0) { + truncate(config, 0); + } + f = fopen(config, "w+"); + if (f != NULL) { + int k; + char *underscore = strrchr(filepath,'_'); + *underscore=0; + strcpy(config, filepath); + *underscore='_'; + sprintf(linebuf,"stripe_size=%ld\n", sf_stripe_size); + fwrite(linebuf, strlen(linebuf), 1, f); + sprintf(linebuf,"aggregator_count=%d\n",n_io_concentrators); + fwrite(linebuf, strlen(linebuf), 1, f); + + for(k=0; k < n_io_concentrators; k++) { + snprintf(linebuf,bufsize,"%s_%d:%d\n",config, k, io_concentrator[k]); + fwrite(linebuf, strlen(linebuf), 1, f); + } + + fclose(f); + } + else { + perror("fopen(config)"); + errors++; + goto done; + } + } + if (sf_verbose_flag) { + printf("[ioc:%d] Opened subfile %s\n", subfile_rank, filepath); + } + } + } done: return errors; } + + void delete_subfiling_context(hid_t context_id) { subfiling_context_t *sf_context = get_subfiling_object(context_id); - MPI_Comm_free(&sf_context->sf_msg_comm); - MPI_Comm_free(&sf_context->sf_data_comm); - sf_msg_comm = MPI_COMM_NULL; - sf_data_comm = MPI_COMM_NULL; - if (n_io_concentrators > 1) { - MPI_Comm_free(&sf_context->sf_group_comm); - MPI_Comm_free(&sf_context->sf_intercomm); + if (sf_context) { + MPI_Comm_free(&sf_context->sf_msg_comm); + MPI_Comm_free(&sf_context->sf_data_comm); + sf_msg_comm = MPI_COMM_NULL; + sf_data_comm = MPI_COMM_NULL; + if (n_io_concentrators > 1) { + MPI_Comm_free(&sf_context->sf_group_comm); + MPI_Comm_free(&sf_context->sf_intercomm); + } + free(sf_context); } - - free(sf_context); - usleep(100); + usleep(100); return; } diff --git a/src/H5FDsubfile_private.h b/src/H5FDsubfile_private.h index 0774d7f..0088c13 100644 --- a/src/H5FDsubfile_private.h +++ b/src/H5FDsubfile_private.h @@ -73,7 +73,7 @@ typedef struct { } sf_topology_t; #define K(n) ((n)*1024) -#define DEFAULT_STRIPE_SIZE K(4) /* (1024*1024) */ +#define DEFAULT_STRIPE_SIZE K(256) /* (1024*1024) */ #define MAX_DEPTH 256 typedef enum io_ops { @@ -121,6 +121,7 @@ typedef enum { #define ACK_PART (0x0acc << 8) #define DATA_PART (0xd8da << 8) +#define READY (0xfeed << 8) #define COMPLETED (0xfed1 << 8) #define READ_INDEP (READ_OP) @@ -165,6 +166,7 @@ H5_DLL void * get_subfiling_object(int64_t object_id); H5_DLL hid_t get_subfiling_context(void); H5_DLL int initialize_ioc_threads(subfiling_context_t *sf_context); H5_DLL int tpool_add_work(sf_work_request_t *); +H5_DLL bool tpool_is_empty(void); H5_DLL int ioc_main(subfiling_context_t *context); H5_DLL int queue_write_coll(sf_work_request_t *msg, int subfile_rank, int source, MPI_Comm comm); H5_DLL int queue_read_coll(sf_work_request_t *msg, int subfile_rank, int source, MPI_Comm comm); @@ -182,5 +184,9 @@ H5_DLL int sf_write_independent(hid_t context_id, int64_t offset, int64_t elemen H5_DLL int sf_read_data(int fd, int64_t file_offset, void *data_buffer, int64_t data_size, int subfile_rank); H5_DLL void delete_subfiling_context(hid_t context_id); H5_DLL void finalize_ioc_threads(void); +H5_DLL int begin_thread_exclusive(void); +H5_DLL int end_thread_exclusive(void); +H5_DLL int wait_for_thread_main(void); +H5_DLL int finalize_subfile_close(void); #endif diff --git a/src/H5FDsubfile_threads.c b/src/H5FDsubfile_threads.c index 9c54a53..fb99930 100644 --- a/src/H5FDsubfile_threads.c +++ b/src/H5FDsubfile_threads.c @@ -13,10 +13,13 @@ #include "mercury/mercury_thread_spin.c" static hg_thread_mutex_t ioc_mutex = PTHREAD_MUTEX_INITIALIZER; +static hg_thread_mutex_t ioc_thread_mutex = PTHREAD_MUTEX_INITIALIZER; static hg_thread_pool_t *ioc_thread_pool = NULL; static hg_thread_t ioc_thread; +#ifndef HG_TEST_NUM_THREADS_DEFAULT #define HG_TEST_NUM_THREADS_DEFAULT 4 +#endif #define POOL_CONCURRENT_MAX 64 static struct hg_thread_work pool_request[POOL_CONCURRENT_MAX]; @@ -42,6 +45,12 @@ initialize_ioc_threads(subfiling_context_t *sf_context) puts("hg_thread_mutex_init failed"); goto err_exit; } + status = hg_thread_mutex_init(&ioc_thread_mutex); + if (status) { + puts("hg_thread_mutex_init failed"); + goto err_exit; + } + status = hg_thread_pool_init(HG_TEST_NUM_THREADS_DEFAULT, &ioc_thread_pool); if (status) { puts("hg_thread_pool_init failed"); @@ -64,10 +73,6 @@ void __attribute__((destructor)) finalize_ioc_threads(void) if (ioc_thread_pool != NULL) { hg_thread_pool_destroy(ioc_thread_pool); ioc_thread_pool = NULL; - - if (hg_thread_join(ioc_thread) == 0) - puts("thread_join succeeded"); - else puts("thread_join failed"); } } @@ -94,10 +99,8 @@ handle_work_request(void *arg) status = queue_read_indep( msg, msg->subfile_rank, msg->source, sf_data_comm); break; case CLOSE_OP: - hg_thread_mutex_lock(&ioc_mutex); status = decrement_file_ref_counts( msg->subfile_rank, msg->source, sf_data_comm, subfiling_close_file); - hg_thread_mutex_unlock(&ioc_mutex); break; case OPEN_OP: status = queue_file_open( msg, msg->subfile_rank, msg->source, sf_data_comm); @@ -130,3 +133,29 @@ int tpool_add_work(sf_work_request_t *work) hg_thread_mutex_unlock(&ioc_mutex); return 0; } + +bool tpool_is_empty(void) +{ + return HG_QUEUE_IS_EMPTY(&ioc_thread_pool->queue); +} + +int begin_thread_exclusive(void) +{ + return hg_thread_mutex_lock(&ioc_thread_mutex); +} + +int end_thread_exclusive(void) +{ + return hg_thread_mutex_unlock(&ioc_thread_mutex); +} + +int wait_for_thread_main(void) +{ + if (hg_thread_join(ioc_thread) == 0) + puts("thread_join succeeded"); + else { + puts("thread_join failed"); + return -1; + } + return 0; +} diff --git a/testpar/CMakeLists.txt b/testpar/CMakeLists.txt index 3d0fe7e..c95e01f 100644 --- a/testpar/CMakeLists.txt +++ b/testpar/CMakeLists.txt @@ -76,6 +76,7 @@ set (H5P_TESTS t_filters_parallel t_2Gio t_subfile_openclose + t_subfile_readwrite ) foreach (h5_testp ${H5P_TESTS}) diff --git a/testpar/Makefile.am b/testpar/Makefile.am index 9f8eca9..2914cb1 100644 --- a/testpar/Makefile.am +++ b/testpar/Makefile.am @@ -31,7 +31,7 @@ check_SCRIPTS = $(TEST_SCRIPT_PARA) # Test programs. These are our main targets. # TEST_PROG_PARA=t_mpi t_bigio testphdf5 t_cache t_cache_image t_pread t_pshutdown t_prestart t_init_term t_shapesame t_filters_parallel t_2Gio \ - t_subfile_openclose + t_subfile_openclose t_subfile_readwrite # t_pflush1 and t_pflush2 are used by testpflush.sh check_PROGRAMS = $(TEST_PROG_PARA) t_pflush1 t_pflush2 diff --git a/testpar/t_subfile_openclose.c b/testpar/t_subfile_openclose.c index 9f31b6e..fe39f2c 100644 --- a/testpar/t_subfile_openclose.c +++ b/testpar/t_subfile_openclose.c @@ -9,13 +9,21 @@ main(int argc, char **argv) { int i, mpi_size, mpi_rank; + int loop_count = 20; int mpi_provides, require = MPI_THREAD_MULTIPLE; - hid_t subfile_id; + hid_t subfile_id = -1; MPI_Init_thread(&argc, &argv, require, &mpi_provides); MPI_Comm_size(MPI_COMM_WORLD, &mpi_size); MPI_Comm_rank(MPI_COMM_WORLD, &mpi_rank); + if (argc > 1) { + int check_value = atoi(argv[1]); + if (check_value > 0) { + loop_count = check_value; + } + } + H5open(); if (H5FDsubfiling_init() == SUCCEED) { @@ -26,15 +34,15 @@ main(int argc, char **argv) puts("Error: Unable to initialize subfiling!"); } - for(i=0; i < 10; i++) { + for(i=0; i < loop_count; i++) { sf_open_subfiles(subfile_id, NULL, O_CREAT|O_TRUNC|O_RDWR); sf_close_subfiles(subfile_id); } - MPI_Barrier(MPI_COMM_WORLD); - H5FDsubfiling_finalize(); + MPI_Barrier(MPI_COMM_WORLD); + H5close(); MPI_Finalize(); diff --git a/testpar/t_subfile_readwrite.c b/testpar/t_subfile_readwrite.c index 24d40e3..b4c798a 100644 --- a/testpar/t_subfile_readwrite.c +++ b/testpar/t_subfile_readwrite.c @@ -158,6 +158,10 @@ done: free(local_data); local_data = NULL; } + if (verify_data) { + free(verify_data); + verify_data = NULL; + } H5close(); -- cgit v0.12