diff options
author | jhendersonHDF <jhenderson@hdfgroup.org> | 2022-09-16 16:17:30 (GMT) |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-09-16 16:17:30 (GMT) |
commit | 16aa2dbaa0e70bf81f4329a70a45c601433549bb (patch) | |
tree | 7c6debf81d393d9294a2e6d79ca36b53d485348d /src/H5FDsubfiling/H5FDioc_threads.c | |
parent | 45178c87a3099a9fef8bae6f7249ca306cf89629 (diff) | |
download | hdf5-16aa2dbaa0e70bf81f4329a70a45c601433549bb.zip hdf5-16aa2dbaa0e70bf81f4329a70a45c601433549bb.tar.gz hdf5-16aa2dbaa0e70bf81f4329a70a45c601433549bb.tar.bz2 |
Subfiling VFD updates (#2106)
Diffstat (limited to 'src/H5FDsubfiling/H5FDioc_threads.c')
-rw-r--r-- | src/H5FDsubfiling/H5FDioc_threads.c | 300 |
1 files changed, 196 insertions, 104 deletions
diff --git a/src/H5FDsubfiling/H5FDioc_threads.c b/src/H5FDsubfiling/H5FDioc_threads.c index 813fb3f..b3e8ebc 100644 --- a/src/H5FDsubfiling/H5FDioc_threads.c +++ b/src/H5FDsubfiling/H5FDioc_threads.c @@ -72,16 +72,16 @@ static double sf_queue_delay_time = 0.0; static HG_THREAD_RETURN_TYPE ioc_thread_main(void *arg); static int ioc_main(ioc_data_t *ioc_data); -static int ioc_file_queue_write_indep(sf_work_request_t *msg, int subfile_rank, int source, MPI_Comm comm, +static int ioc_file_queue_write_indep(sf_work_request_t *msg, int ioc_idx, int source, MPI_Comm comm, uint32_t counter); -static int ioc_file_queue_read_indep(sf_work_request_t *msg, int subfile_rank, int source, MPI_Comm comm); +static int ioc_file_queue_read_indep(sf_work_request_t *msg, int ioc_idx, int source, MPI_Comm comm, + uint32_t counter); static int ioc_file_write_data(int fd, int64_t file_offset, void *data_buffer, int64_t data_size, - int subfile_rank); -static int ioc_file_read_data(int fd, int64_t file_offset, void *data_buffer, int64_t data_size, - int subfile_rank); -static int ioc_file_truncate(int fd, int64_t length, int subfile_rank); -static int ioc_file_report_eof(sf_work_request_t *msg, int subfile_rank, int source, MPI_Comm comm); + int ioc_idx); +static int ioc_file_read_data(int fd, int64_t file_offset, void *data_buffer, int64_t data_size, int ioc_idx); +static int ioc_file_truncate(sf_work_request_t *msg); +static int ioc_file_report_eof(sf_work_request_t *msg, MPI_Comm comm); static ioc_io_queue_entry_t *ioc_io_queue_alloc_entry(void); static void ioc_io_queue_complete_entry(ioc_data_t *ioc_data, ioc_io_queue_entry_t *entry_ptr); @@ -156,6 +156,8 @@ initialize_ioc_threads(void *_sf_context) #endif }; + sf_context->ioc_data = ioc_data; + /* Initialize atomic vars */ atomic_init(&ioc_data->sf_ioc_ready, 0); atomic_init(&ioc_data->sf_shutdown_flag, 0); @@ -194,7 +196,7 @@ initialize_ioc_threads(void *_sf_context) t_end = MPI_Wtime(); #ifdef H5FD_IOC_DEBUG - if (sf_context->topology->subfile_rank == 0) { + if (sf_context->topology->ioc_idx == 0) { HDprintf("%s: time = %lf seconds\n", __func__, (t_end - t_start)); HDfflush(stdout); } @@ -202,8 +204,6 @@ initialize_ioc_threads(void *_sf_context) #endif - sf_context->ioc_data = ioc_data; - done: H5_SUBFILING_FUNC_LEAVE; } @@ -245,6 +245,7 @@ finalize_ioc_threads(void *_sf_context) ioc_data->io_queue.num_failed); HDfree(ioc_data); + sf_context->ioc_data = NULL; H5_SUBFILING_FUNC_LEAVE; } @@ -346,7 +347,6 @@ ioc_main(ioc_data_t *ioc_data) { subfiling_context_t *context = NULL; sf_work_request_t wk_req; - int subfile_rank; int shutdown_requested; int ret_value = 0; @@ -362,8 +362,6 @@ ioc_main(ioc_data_t *ioc_data) * represent an open file). */ - subfile_rank = context->sf_group_rank; - /* tell initialize_ioc_threads() that ioc_main() is ready to enter its main loop */ atomic_store(&ioc_data->sf_ioc_ready, 1); @@ -415,11 +413,11 @@ ioc_main(ioc_data_t *ioc_data) queue_start_time = MPI_Wtime(); - wk_req.tag = tag; - wk_req.source = source; - wk_req.subfile_rank = subfile_rank; - wk_req.context_id = ioc_data->sf_context_id; - wk_req.start_time = queue_start_time; + wk_req.tag = tag; + wk_req.source = source; + wk_req.ioc_idx = context->topology->ioc_idx; + wk_req.context_id = ioc_data->sf_context_id; + wk_req.start_time = queue_start_time; ioc_io_queue_add_entry(ioc_data, &wk_req); @@ -506,7 +504,7 @@ handle_work_request(void *arg) subfiling_context_t *sf_context = NULL; sf_work_request_t *msg = &(q_entry_ptr->wk_req); ioc_data_t *ioc_data = NULL; - int64_t file_context_id = msg->header[2]; + int64_t file_context_id = msg->context_id; int op_ret; hg_thread_ret_t ret_value = 0; @@ -524,27 +522,27 @@ handle_work_request(void *arg) switch (msg->tag) { case WRITE_INDEP: - op_ret = ioc_file_queue_write_indep(msg, msg->subfile_rank, msg->source, sf_context->sf_data_comm, + op_ret = ioc_file_queue_write_indep(msg, msg->ioc_idx, msg->source, sf_context->sf_data_comm, q_entry_ptr->counter); break; case READ_INDEP: - op_ret = ioc_file_queue_read_indep(msg, msg->subfile_rank, msg->source, sf_context->sf_data_comm); + op_ret = ioc_file_queue_read_indep(msg, msg->ioc_idx, msg->source, sf_context->sf_data_comm, + q_entry_ptr->counter); break; case TRUNC_OP: - op_ret = ioc_file_truncate(sf_context->sf_fid, q_entry_ptr->wk_req.header[0], - sf_context->topology->subfile_rank); + op_ret = ioc_file_truncate(msg); break; case GET_EOF_OP: - op_ret = ioc_file_report_eof(msg, msg->subfile_rank, msg->source, sf_context->sf_eof_comm); + op_ret = ioc_file_report_eof(msg, sf_context->sf_eof_comm); break; default: #ifdef H5_SUBFILING_DEBUG H5_subfiling_log(file_context_id, "%s: IOC %d received unknown message with tag %x from rank %d", - __func__, msg->subfile_rank, msg->tag, msg->source); + __func__, msg->ioc_idx, msg->tag, msg->source); #endif op_ret = -1; @@ -555,11 +553,11 @@ handle_work_request(void *arg) if (op_ret < 0) { #ifdef H5_SUBFILING_DEBUG - H5_subfiling_log( - file_context_id, - "%s: IOC %d request(%s) filename=%s from rank(%d), size=%ld, offset=%ld FAILED with ret %d", - __func__, msg->subfile_rank, translate_opcode((io_op_t)msg->tag), sf_context->sf_filename, - msg->source, msg->header[0], msg->header[1], op_ret); + H5_subfiling_log(file_context_id, + "%s: IOC %d request(%s) from rank(%d), (%" PRId64 ", %" PRId64 ", %" PRId64 + ") FAILED with ret %d", + __func__, msg->ioc_idx, translate_opcode((io_op_t)msg->tag), msg->source, + msg->header[0], msg->header[1], msg->header[2], op_ret); #endif q_entry_ptr->wk_ret = op_ret; @@ -686,15 +684,15 @@ from the thread pool threads... *------------------------------------------------------------------------- */ static int -ioc_file_queue_write_indep(sf_work_request_t *msg, int subfile_rank, int source, MPI_Comm comm, - uint32_t counter) +ioc_file_queue_write_indep(sf_work_request_t *msg, int ioc_idx, int source, MPI_Comm comm, uint32_t counter) { subfiling_context_t *sf_context = NULL; MPI_Status msg_status; hbool_t send_nack = FALSE; + int64_t file_context_id; int64_t data_size; int64_t file_offset; - int64_t file_context_id; + int64_t subfile_idx; int64_t stripe_id; haddr_t sf_eof; #ifdef H5FD_IOC_COLLECT_STATS @@ -714,10 +712,12 @@ ioc_file_queue_write_indep(sf_work_request_t *msg, int subfile_rank, int source, HDassert(msg); + file_context_id = msg->context_id; + /* Retrieve the fields of the RPC message for the write operation */ - data_size = msg->header[0]; - file_offset = msg->header[1]; - file_context_id = msg->header[2]; + data_size = msg->header[0]; + file_offset = msg->header[1]; + subfile_idx = msg->header[2]; if (data_size < 0) { send_nack = TRUE; @@ -746,7 +746,7 @@ ioc_file_queue_write_indep(sf_work_request_t *msg, int subfile_rank, int source, #ifdef H5_SUBFILING_DEBUG H5_subfiling_log(file_context_id, "[ioc(%d) %s]: msg from %d: datasize=%ld\toffset=%ld, queue_delay = %lf seconds\n", - subfile_rank, __func__, source, data_size, file_offset, t_queue_delay); + ioc_idx, __func__, source, data_size, file_offset, t_queue_delay); #endif #endif @@ -764,12 +764,12 @@ ioc_file_queue_write_indep(sf_work_request_t *msg, int subfile_rank, int source, * allows us to distinguish between multiple concurrent * writes from a single rank. */ - HDassert(H5FD_IOC_tag_ub_val_ptr && (*H5FD_IOC_tag_ub_val_ptr >= WRITE_TAG_BASE)); - rcv_tag = (int)(counter % (INT_MAX - WRITE_TAG_BASE)); - rcv_tag %= (*H5FD_IOC_tag_ub_val_ptr - WRITE_TAG_BASE); - rcv_tag += WRITE_TAG_BASE; + HDassert(H5FD_IOC_tag_ub_val_ptr && (*H5FD_IOC_tag_ub_val_ptr >= IO_TAG_BASE)); + rcv_tag = (int)(counter % (INT_MAX - IO_TAG_BASE)); + rcv_tag %= (*H5FD_IOC_tag_ub_val_ptr - IO_TAG_BASE); + rcv_tag += IO_TAG_BASE; - if (send_ack_to_client(rcv_tag, source, subfile_rank, WRITE_INDEP_ACK, comm) < 0) + if (send_ack_to_client(rcv_tag, source, ioc_idx, WRITE_INDEP_ACK, comm) < 0) H5_SUBFILING_GOTO_ERROR(H5E_IO, H5E_WRITEERROR, -1, "couldn't send ACK to client"); /* Receive data from client */ @@ -794,13 +794,14 @@ ioc_file_queue_write_indep(sf_work_request_t *msg, int subfile_rank, int source, t_start = t_end; #ifdef H5_SUBFILING_DEBUG - H5_subfiling_log(file_context_id, "[ioc(%d) %s] MPI_Recv(%ld bytes, from = %d) status = %d\n", - subfile_rank, __func__, data_size, source, mpi_code); + H5_subfiling_log(file_context_id, "[ioc(%d) %s] MPI_Recv(%ld bytes, from = %d) status = %d\n", ioc_idx, + __func__, data_size, source, mpi_code); #endif #endif - sf_fid = sf_context->sf_fid; + HDassert(subfile_idx < sf_context->sf_num_fids); + sf_fid = sf_context->sf_fids[subfile_idx]; #ifdef H5_SUBFILING_DEBUG if (sf_fid < 0) @@ -810,7 +811,7 @@ ioc_file_queue_write_indep(sf_work_request_t *msg, int subfile_rank, int source, if (sf_fid >= 0) { /* Actually write data received from client into subfile */ - if ((write_ret = ioc_file_write_data(sf_fid, file_offset, recv_buf, data_size, subfile_rank)) < 0) + if ((write_ret = ioc_file_write_data(sf_fid, file_offset, recv_buf, data_size, ioc_idx)) < 0) H5_SUBFILING_GOTO_ERROR(H5E_IO, H5E_WRITEERROR, -1, "write function(FID=%d, Source=%d) returned an error (%d)", sf_fid, source, write_ret); @@ -834,10 +835,17 @@ ioc_file_queue_write_indep(sf_work_request_t *msg, int subfile_rank, int source, H5FD_ioc_end_thread_exclusive(); + /* + * Send a message back to the client that the I/O call has + * completed and it is safe to return from the write call + */ + if (MPI_SUCCESS != (mpi_code = MPI_Send(&rcv_tag, 1, MPI_INT, source, WRITE_DATA_DONE, comm))) + H5_SUBFILING_MPI_GOTO_ERROR(-1, "MPI_Send failed", mpi_code); + done: if (send_nack) { /* Send NACK back to client so client can handle failure gracefully */ - if (send_nack_to_client(source, subfile_rank, WRITE_INDEP_ACK, comm) < 0) + if (send_nack_to_client(source, ioc_idx, WRITE_INDEP_ACK, comm) < 0) H5_SUBFILING_DONE_ERROR(H5E_IO, H5E_WRITEERROR, -1, "couldn't send NACK to client"); } @@ -867,13 +875,16 @@ done: *------------------------------------------------------------------------- */ static int -ioc_file_queue_read_indep(sf_work_request_t *msg, int subfile_rank, int source, MPI_Comm comm) +ioc_file_queue_read_indep(sf_work_request_t *msg, int ioc_idx, int source, MPI_Comm comm, uint32_t counter) { subfiling_context_t *sf_context = NULL; hbool_t send_empty_buf = TRUE; + hbool_t send_nack = FALSE; + hbool_t need_data_tag = FALSE; + int64_t file_context_id; int64_t data_size; int64_t file_offset; - int64_t file_context_id; + int64_t subfile_idx; #ifdef H5FD_IOC_COLLECT_STATS double t_start; double t_end; @@ -881,6 +892,7 @@ ioc_file_queue_read_indep(sf_work_request_t *msg, int subfile_rank, int source, double t_queue_delay; #endif char *send_buf = NULL; + int send_tag; int sf_fid; int read_ret; int mpi_code; @@ -888,17 +900,37 @@ ioc_file_queue_read_indep(sf_work_request_t *msg, int subfile_rank, int source, HDassert(msg); - /* Retrieve the fields of the RPC message for the read operation */ - data_size = msg->header[0]; - file_offset = msg->header[1]; - file_context_id = msg->header[2]; - - if (data_size < 0) - H5_SUBFILING_GOTO_ERROR(H5E_IO, H5E_BADVALUE, -1, "invalid data size for read"); + file_context_id = msg->context_id; sf_context = H5_get_subfiling_object(file_context_id); HDassert(sf_context); + /* + * If we are using 1 subfile per IOC, we can optimize reads + * a little since each read will go to a separate IOC and we + * won't be in danger of data being received in an + * unpredictable order. However, if some IOCs own more than + * 1 subfile, we need to associate each read with a unique + * message tag to make sure the data is received in the + * correct order. + */ + need_data_tag = sf_context->sf_num_subfiles != sf_context->topology->n_io_concentrators; + if (!need_data_tag) + send_tag = READ_INDEP_DATA; + + /* Retrieve the fields of the RPC message for the read operation */ + data_size = msg->header[0]; + file_offset = msg->header[1]; + subfile_idx = msg->header[2]; + + if (data_size < 0) { + if (need_data_tag) { + send_nack = TRUE; + send_empty_buf = FALSE; + } + H5_SUBFILING_GOTO_ERROR(H5E_IO, H5E_BADVALUE, -1, "invalid data size for read"); + } + /* Flag that we've attempted to read data from the file */ sf_context->sf_read_count++; @@ -911,22 +943,48 @@ ioc_file_queue_read_indep(sf_work_request_t *msg, int subfile_rank, int source, #ifdef H5_SUBFILING_DEBUG H5_subfiling_log(file_context_id, - "[ioc(%d) %s] msg from %d: datasize=%ld\toffset=%ld queue_delay=%lf seconds\n", - subfile_rank, __func__, source, data_size, file_offset, t_queue_delay); + "[ioc(%d) %s] msg from %d: datasize=%ld\toffset=%ld queue_delay=%lf seconds\n", ioc_idx, + __func__, source, data_size, file_offset, t_queue_delay); #endif #endif /* Allocate space to send data read from file to client */ - if (NULL == (send_buf = HDmalloc((size_t)data_size))) + if (NULL == (send_buf = HDmalloc((size_t)data_size))) { + if (need_data_tag) { + send_nack = TRUE; + send_empty_buf = FALSE; + } H5_SUBFILING_GOTO_ERROR(H5E_RESOURCE, H5E_CANTALLOC, -1, "couldn't allocate send buffer for data"); + } + + if (need_data_tag) { + /* + * Calculate message tag for the client to use for receiving + * data, then send an ACK message to the client with the + * calculated message tag. This calculated message tag + * allows us to distinguish between multiple concurrent + * reads from a single rank, which can happen when a rank + * owns multiple subfiles. + */ + HDassert(H5FD_IOC_tag_ub_val_ptr && (*H5FD_IOC_tag_ub_val_ptr >= IO_TAG_BASE)); + send_tag = (int)(counter % (INT_MAX - IO_TAG_BASE)); + send_tag %= (*H5FD_IOC_tag_ub_val_ptr - IO_TAG_BASE); + send_tag += IO_TAG_BASE; + + if (send_ack_to_client(send_tag, source, ioc_idx, READ_INDEP_ACK, comm) < 0) { + send_empty_buf = FALSE; + H5_SUBFILING_GOTO_ERROR(H5E_IO, H5E_READERROR, -1, "couldn't send ACK to client"); + } + } - sf_fid = sf_context->sf_fid; + /* Read data from the subfile */ + HDassert(subfile_idx < sf_context->sf_num_fids); + sf_fid = sf_context->sf_fids[subfile_idx]; if (sf_fid < 0) H5_SUBFILING_GOTO_ERROR(H5E_IO, H5E_BADVALUE, -1, "subfile file descriptor %d is invalid", sf_fid); - /* Read data from the subfile */ - if ((read_ret = ioc_file_read_data(sf_fid, file_offset, send_buf, data_size, subfile_rank)) < 0) { + if ((read_ret = ioc_file_read_data(sf_fid, file_offset, send_buf, data_size, ioc_idx)) < 0) { H5_SUBFILING_GOTO_ERROR(H5E_IO, H5E_READERROR, read_ret, "read function(FID=%d, Source=%d) returned an error (%d)", sf_fid, source, read_ret); @@ -936,8 +994,7 @@ ioc_file_queue_read_indep(sf_work_request_t *msg, int subfile_rank, int source, /* Send read data to the client */ H5_CHECK_OVERFLOW(data_size, int64_t, int); - if (MPI_SUCCESS != - (mpi_code = MPI_Send(send_buf, (int)data_size, MPI_BYTE, source, READ_INDEP_DATA, comm))) + if (MPI_SUCCESS != (mpi_code = MPI_Send(send_buf, (int)data_size, MPI_BYTE, source, send_tag, comm))) H5_SUBFILING_MPI_GOTO_ERROR(-1, "MPI_Send failed", mpi_code); #ifdef H5FD_IOC_COLLECT_STATS @@ -947,19 +1004,24 @@ ioc_file_queue_read_indep(sf_work_request_t *msg, int subfile_rank, int source, sf_queue_delay_time += t_queue_delay; #ifdef H5_SUBFILING_DEBUG - H5_subfiling_log(sf_context->sf_context_id, "[ioc(%d)] MPI_Send to source(%d) completed\n", subfile_rank, + H5_subfiling_log(sf_context->sf_context_id, "[ioc(%d)] MPI_Send to source(%d) completed\n", ioc_idx, source); #endif #endif done: + if (need_data_tag && send_nack) { + /* Send NACK back to client so client can handle failure gracefully */ + if (send_nack_to_client(source, ioc_idx, READ_INDEP_ACK, comm) < 0) + H5_SUBFILING_DONE_ERROR(H5E_IO, H5E_READERROR, -1, "couldn't send NACK to client"); + } if (send_empty_buf) { /* * Send an empty message back to client on failure. The client will * likely get a message truncation error, but at least shouldn't hang. */ - if (MPI_SUCCESS != (mpi_code = MPI_Send(NULL, 0, MPI_BYTE, source, READ_INDEP_DATA, comm))) + if (MPI_SUCCESS != (mpi_code = MPI_Send(NULL, 0, MPI_BYTE, source, send_tag, comm))) H5_SUBFILING_MPI_DONE_ERROR(-1, "MPI_Send failed", mpi_code); } @@ -978,7 +1040,7 @@ being thread safe. */ static int -ioc_file_write_data(int fd, int64_t file_offset, void *data_buffer, int64_t data_size, int subfile_rank) +ioc_file_write_data(int fd, int64_t file_offset, void *data_buffer, int64_t data_size, int ioc_idx) { ssize_t bytes_remaining = (ssize_t)data_size; ssize_t bytes_written = 0; @@ -986,7 +1048,7 @@ ioc_file_write_data(int fd, int64_t file_offset, void *data_buffer, int64_t data int ret_value = 0; #ifndef H5FD_IOC_DEBUG - (void)subfile_rank; + (void)ioc_idx; #endif HDcompile_assert(H5_SIZEOF_OFF_T == sizeof(file_offset)); @@ -1000,7 +1062,7 @@ ioc_file_write_data(int fd, int64_t file_offset, void *data_buffer, int64_t data bytes_remaining -= bytes_written; #ifdef H5FD_IOC_DEBUG - HDprintf("[ioc(%d) %s]: wrote %ld bytes, remaining=%ld, file_offset=%" PRId64 "\n", subfile_rank, + HDprintf("[ioc(%d) %s]: wrote %ld bytes, remaining=%ld, file_offset=%" PRId64 "\n", ioc_idx, __func__, bytes_written, bytes_remaining, file_offset); #endif @@ -1024,7 +1086,7 @@ done: } /* end ioc_file_write_data() */ static int -ioc_file_read_data(int fd, int64_t file_offset, void *data_buffer, int64_t data_size, int subfile_rank) +ioc_file_read_data(int fd, int64_t file_offset, void *data_buffer, int64_t data_size, int ioc_idx) { useconds_t delay = 100; ssize_t bytes_remaining = (ssize_t)data_size; @@ -1034,7 +1096,7 @@ ioc_file_read_data(int fd, int64_t file_offset, void *data_buffer, int64_t data_ int ret_value = 0; #ifndef H5FD_IOC_DEBUG - (void)subfile_rank; + (void)ioc_idx; #endif HDcompile_assert(H5_SIZEOF_OFF_T == sizeof(file_offset)); @@ -1052,7 +1114,7 @@ ioc_file_read_data(int fd, int64_t file_offset, void *data_buffer, int64_t data_ bytes_remaining -= bytes_read; #ifdef H5FD_IOC_DEBUG - HDprintf("[ioc(%d) %s]: read %ld bytes, remaining=%ld, file_offset=%" PRId64 "\n", subfile_rank, + HDprintf("[ioc(%d) %s]: read %ld bytes, remaining=%ld, file_offset=%" PRId64 "\n", ioc_idx, __func__, bytes_read, bytes_remaining, file_offset); #endif @@ -1069,8 +1131,8 @@ ioc_file_read_data(int fd, int64_t file_offset, void *data_buffer, int64_t data_ else { if (retries == 0) { #ifdef H5FD_IOC_DEBUG - HDprintf("[ioc(%d) %s]: TIMEOUT: file_offset=%" PRId64 ", data_size=%ld\n", subfile_rank, - __func__, file_offset, data_size); + HDprintf("[ioc(%d) %s]: TIMEOUT: file_offset=%" PRId64 ", data_size=%ld\n", ioc_idx, __func__, + file_offset, data_size); #endif H5_SUBFILING_SYS_GOTO_ERROR(H5E_IO, H5E_READERROR, -1, "HDpread failed"); @@ -1087,19 +1149,40 @@ done: } /* end ioc_file_read_data() */ static int -ioc_file_truncate(int fd, int64_t length, int subfile_rank) +ioc_file_truncate(sf_work_request_t *msg) { - int ret_value = 0; + subfiling_context_t *sf_context = NULL; + int64_t file_context_id; + int64_t length; + int64_t subfile_idx; + int fd; + int ioc_idx; + int ret_value = 0; + + HDassert(msg); + + file_context_id = msg->context_id; + ioc_idx = msg->ioc_idx; + + length = msg->header[0]; + subfile_idx = msg->header[1]; #ifndef H5FD_IOC_DEBUG - (void)subfile_rank; + (void)ioc_idx; #endif + if (NULL == (sf_context = H5_get_subfiling_object(file_context_id))) + H5_SUBFILING_GOTO_ERROR(H5E_FILE, H5E_CANTGET, -1, "couldn't retrieve subfiling context"); + + HDassert(subfile_idx < sf_context->sf_num_fids); + + fd = sf_context->sf_fids[subfile_idx]; + if (HDftruncate(fd, (off_t)length) != 0) H5_SUBFILING_SYS_GOTO_ERROR(H5E_FILE, H5E_SEEKERROR, -1, "HDftruncate failed"); #ifdef H5FD_IOC_DEBUG - HDprintf("[ioc(%d) %s]: truncated subfile to %lld bytes. ret = %d\n", subfile_rank, __func__, + HDprintf("[ioc(%d) %s]: truncated subfile to %lld bytes. ret = %d\n", ioc_idx, __func__, (long long)length, errno); HDfflush(stdout); #endif @@ -1111,7 +1194,7 @@ done: /*------------------------------------------------------------------------- * Function: ioc_file_report_eof * - * Purpose: Determine the target sub-file's eof and report this value + * Purpose: Determine the target subfile's eof and report this value * to the requesting rank. * * Notes: This function will have to be reworked once we solve @@ -1131,40 +1214,48 @@ done: */ static int -ioc_file_report_eof(sf_work_request_t *msg, int subfile_rank, int source, MPI_Comm comm) +ioc_file_report_eof(sf_work_request_t *msg, MPI_Comm comm) { subfiling_context_t *sf_context = NULL; h5_stat_t sb; int64_t eof_req_reply[3]; int64_t file_context_id; + int64_t subfile_idx; int fd; + int source; + int ioc_idx; int mpi_code; int ret_value = 0; HDassert(msg); - /* first get the EOF of the target file. */ + file_context_id = msg->context_id; + source = msg->source; + ioc_idx = msg->ioc_idx; - file_context_id = msg->header[2]; + subfile_idx = msg->header[0]; if (NULL == (sf_context = H5_get_subfiling_object(file_context_id))) H5_SUBFILING_GOTO_ERROR(H5E_FILE, H5E_CANTGET, -1, "couldn't retrieve subfiling context"); - fd = sf_context->sf_fid; + HDassert(subfile_idx < sf_context->sf_num_fids); + + fd = sf_context->sf_fids[subfile_idx]; if (HDfstat(fd, &sb) < 0) H5_SUBFILING_SYS_GOTO_ERROR(H5E_FILE, H5E_SYSERRSTR, -1, "HDfstat failed"); - eof_req_reply[0] = (int64_t)subfile_rank; + eof_req_reply[0] = (int64_t)ioc_idx; eof_req_reply[1] = (int64_t)(sb.st_size); - eof_req_reply[2] = 0; /* not used */ + eof_req_reply[2] = subfile_idx; #ifdef H5_SUBFILING_DEBUG H5_subfiling_log(file_context_id, "%s: reporting file EOF as %" PRId64 ".", __func__, eof_req_reply[1]); #endif /* return the subfile EOF to the querying rank */ - if (MPI_SUCCESS != (mpi_code = MPI_Send(eof_req_reply, 3, MPI_INT64_T, source, GET_EOF_COMPLETED, comm))) + if (MPI_SUCCESS != + (mpi_code = MPI_Send(eof_req_reply, 1, H5_subfiling_rpc_msg_type, source, GET_EOF_COMPLETED, comm))) H5_SUBFILING_MPI_GOTO_ERROR(-1, "MPI_Send", mpi_code); done: @@ -1272,12 +1363,13 @@ ioc_io_queue_add_entry(ioc_data_t *ioc_data, sf_work_request_t *wk_req_ptr) atomic_fetch_add(&ioc_data->sf_io_ops_pending, 1); #ifdef H5_SUBFILING_DEBUG - H5_subfiling_log(wk_req_ptr->context_id, - "%s: request %d queued. op = %d, offset/len = %lld/%lld, q-ed/disp/ops_pend = %d/%d/%d.", - __func__, entry_ptr->counter, (entry_ptr->wk_req.tag), - (long long)(entry_ptr->wk_req.header[1]), (long long)(entry_ptr->wk_req.header[0]), - ioc_data->io_queue.num_pending, ioc_data->io_queue.num_in_progress, - atomic_load(&ioc_data->sf_io_ops_pending)); + H5_subfiling_log( + wk_req_ptr->context_id, + "%s: request %d queued. op = %d, req = (%lld, %lld, %lld), q-ed/disp/ops_pend = %d/%d/%d.", __func__, + entry_ptr->counter, (entry_ptr->wk_req.tag), (long long)(entry_ptr->wk_req.header[0]), + (long long)(entry_ptr->wk_req.header[1]), (long long)(entry_ptr->wk_req.header[2]), + ioc_data->io_queue.num_pending, ioc_data->io_queue.num_in_progress, + atomic_load(&ioc_data->sf_io_ops_pending)); #endif HDassert(ioc_data->io_queue.num_pending + ioc_data->io_queue.num_in_progress == ioc_data->io_queue.q_len); @@ -1478,14 +1570,14 @@ ioc_io_queue_dispatch_eligible_entries(ioc_data_t *ioc_data, hbool_t try_lock) entry_ptr->thread_wk.args = entry_ptr; #ifdef H5_SUBFILING_DEBUG - H5_subfiling_log(entry_ptr->wk_req.context_id, - "%s: request %d dispatched. op = %d, offset/len = %lld/%lld, " - "q-ed/disp/ops_pend = %d/%d/%d.", - __func__, entry_ptr->counter, (entry_ptr->wk_req.tag), - (long long)(entry_ptr->wk_req.header[1]), - (long long)(entry_ptr->wk_req.header[0]), ioc_data->io_queue.num_pending, - ioc_data->io_queue.num_in_progress, - atomic_load(&ioc_data->sf_io_ops_pending)); + H5_subfiling_log( + entry_ptr->wk_req.context_id, + "%s: request %d dispatched. op = %d, req = (%lld, %lld, %lld), " + "q-ed/disp/ops_pend = %d/%d/%d.", + __func__, entry_ptr->counter, (entry_ptr->wk_req.tag), + (long long)(entry_ptr->wk_req.header[0]), (long long)(entry_ptr->wk_req.header[1]), + (long long)(entry_ptr->wk_req.header[2]), ioc_data->io_queue.num_pending, + ioc_data->io_queue.num_in_progress, atomic_load(&ioc_data->sf_io_ops_pending)); #endif #ifdef H5FD_IOC_COLLECT_STATS @@ -1564,12 +1656,12 @@ ioc_io_queue_complete_entry(ioc_data_t *ioc_data, ioc_io_queue_entry_t *entry_pt #ifdef H5_SUBFILING_DEBUG H5_subfiling_log(entry_ptr->wk_req.context_id, - "%s: request %d completed with ret %d. op = %d, offset/len = %lld/%lld, " + "%s: request %d completed with ret %d. op = %d, req = (%lld, %lld, %lld), " "q-ed/disp/ops_pend = %d/%d/%d.", __func__, entry_ptr->counter, entry_ptr->wk_ret, (entry_ptr->wk_req.tag), - (long long)(entry_ptr->wk_req.header[1]), (long long)(entry_ptr->wk_req.header[0]), - ioc_data->io_queue.num_pending, ioc_data->io_queue.num_in_progress, - atomic_load(&ioc_data->sf_io_ops_pending)); + (long long)(entry_ptr->wk_req.header[0]), (long long)(entry_ptr->wk_req.header[1]), + (long long)(entry_ptr->wk_req.header[2]), ioc_data->io_queue.num_pending, + ioc_data->io_queue.num_in_progress, atomic_load(&ioc_data->sf_io_ops_pending)); /* * If this I/O request is a truncate or "get eof" op, make sure |