summaryrefslogtreecommitdiffstats
path: root/src/H5FDsubfiling/H5FDioc_threads.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/H5FDsubfiling/H5FDioc_threads.c')
-rw-r--r--src/H5FDsubfiling/H5FDioc_threads.c300
1 files changed, 196 insertions, 104 deletions
diff --git a/src/H5FDsubfiling/H5FDioc_threads.c b/src/H5FDsubfiling/H5FDioc_threads.c
index 813fb3f..b3e8ebc 100644
--- a/src/H5FDsubfiling/H5FDioc_threads.c
+++ b/src/H5FDsubfiling/H5FDioc_threads.c
@@ -72,16 +72,16 @@ static double sf_queue_delay_time = 0.0;
static HG_THREAD_RETURN_TYPE ioc_thread_main(void *arg);
static int ioc_main(ioc_data_t *ioc_data);
-static int ioc_file_queue_write_indep(sf_work_request_t *msg, int subfile_rank, int source, MPI_Comm comm,
+static int ioc_file_queue_write_indep(sf_work_request_t *msg, int ioc_idx, int source, MPI_Comm comm,
uint32_t counter);
-static int ioc_file_queue_read_indep(sf_work_request_t *msg, int subfile_rank, int source, MPI_Comm comm);
+static int ioc_file_queue_read_indep(sf_work_request_t *msg, int ioc_idx, int source, MPI_Comm comm,
+ uint32_t counter);
static int ioc_file_write_data(int fd, int64_t file_offset, void *data_buffer, int64_t data_size,
- int subfile_rank);
-static int ioc_file_read_data(int fd, int64_t file_offset, void *data_buffer, int64_t data_size,
- int subfile_rank);
-static int ioc_file_truncate(int fd, int64_t length, int subfile_rank);
-static int ioc_file_report_eof(sf_work_request_t *msg, int subfile_rank, int source, MPI_Comm comm);
+ int ioc_idx);
+static int ioc_file_read_data(int fd, int64_t file_offset, void *data_buffer, int64_t data_size, int ioc_idx);
+static int ioc_file_truncate(sf_work_request_t *msg);
+static int ioc_file_report_eof(sf_work_request_t *msg, MPI_Comm comm);
static ioc_io_queue_entry_t *ioc_io_queue_alloc_entry(void);
static void ioc_io_queue_complete_entry(ioc_data_t *ioc_data, ioc_io_queue_entry_t *entry_ptr);
@@ -156,6 +156,8 @@ initialize_ioc_threads(void *_sf_context)
#endif
};
+ sf_context->ioc_data = ioc_data;
+
/* Initialize atomic vars */
atomic_init(&ioc_data->sf_ioc_ready, 0);
atomic_init(&ioc_data->sf_shutdown_flag, 0);
@@ -194,7 +196,7 @@ initialize_ioc_threads(void *_sf_context)
t_end = MPI_Wtime();
#ifdef H5FD_IOC_DEBUG
- if (sf_context->topology->subfile_rank == 0) {
+ if (sf_context->topology->ioc_idx == 0) {
HDprintf("%s: time = %lf seconds\n", __func__, (t_end - t_start));
HDfflush(stdout);
}
@@ -202,8 +204,6 @@ initialize_ioc_threads(void *_sf_context)
#endif
- sf_context->ioc_data = ioc_data;
-
done:
H5_SUBFILING_FUNC_LEAVE;
}
@@ -245,6 +245,7 @@ finalize_ioc_threads(void *_sf_context)
ioc_data->io_queue.num_failed);
HDfree(ioc_data);
+ sf_context->ioc_data = NULL;
H5_SUBFILING_FUNC_LEAVE;
}
@@ -346,7 +347,6 @@ ioc_main(ioc_data_t *ioc_data)
{
subfiling_context_t *context = NULL;
sf_work_request_t wk_req;
- int subfile_rank;
int shutdown_requested;
int ret_value = 0;
@@ -362,8 +362,6 @@ ioc_main(ioc_data_t *ioc_data)
* represent an open file).
*/
- subfile_rank = context->sf_group_rank;
-
/* tell initialize_ioc_threads() that ioc_main() is ready to enter its main loop */
atomic_store(&ioc_data->sf_ioc_ready, 1);
@@ -415,11 +413,11 @@ ioc_main(ioc_data_t *ioc_data)
queue_start_time = MPI_Wtime();
- wk_req.tag = tag;
- wk_req.source = source;
- wk_req.subfile_rank = subfile_rank;
- wk_req.context_id = ioc_data->sf_context_id;
- wk_req.start_time = queue_start_time;
+ wk_req.tag = tag;
+ wk_req.source = source;
+ wk_req.ioc_idx = context->topology->ioc_idx;
+ wk_req.context_id = ioc_data->sf_context_id;
+ wk_req.start_time = queue_start_time;
ioc_io_queue_add_entry(ioc_data, &wk_req);
@@ -506,7 +504,7 @@ handle_work_request(void *arg)
subfiling_context_t *sf_context = NULL;
sf_work_request_t *msg = &(q_entry_ptr->wk_req);
ioc_data_t *ioc_data = NULL;
- int64_t file_context_id = msg->header[2];
+ int64_t file_context_id = msg->context_id;
int op_ret;
hg_thread_ret_t ret_value = 0;
@@ -524,27 +522,27 @@ handle_work_request(void *arg)
switch (msg->tag) {
case WRITE_INDEP:
- op_ret = ioc_file_queue_write_indep(msg, msg->subfile_rank, msg->source, sf_context->sf_data_comm,
+ op_ret = ioc_file_queue_write_indep(msg, msg->ioc_idx, msg->source, sf_context->sf_data_comm,
q_entry_ptr->counter);
break;
case READ_INDEP:
- op_ret = ioc_file_queue_read_indep(msg, msg->subfile_rank, msg->source, sf_context->sf_data_comm);
+ op_ret = ioc_file_queue_read_indep(msg, msg->ioc_idx, msg->source, sf_context->sf_data_comm,
+ q_entry_ptr->counter);
break;
case TRUNC_OP:
- op_ret = ioc_file_truncate(sf_context->sf_fid, q_entry_ptr->wk_req.header[0],
- sf_context->topology->subfile_rank);
+ op_ret = ioc_file_truncate(msg);
break;
case GET_EOF_OP:
- op_ret = ioc_file_report_eof(msg, msg->subfile_rank, msg->source, sf_context->sf_eof_comm);
+ op_ret = ioc_file_report_eof(msg, sf_context->sf_eof_comm);
break;
default:
#ifdef H5_SUBFILING_DEBUG
H5_subfiling_log(file_context_id, "%s: IOC %d received unknown message with tag %x from rank %d",
- __func__, msg->subfile_rank, msg->tag, msg->source);
+ __func__, msg->ioc_idx, msg->tag, msg->source);
#endif
op_ret = -1;
@@ -555,11 +553,11 @@ handle_work_request(void *arg)
if (op_ret < 0) {
#ifdef H5_SUBFILING_DEBUG
- H5_subfiling_log(
- file_context_id,
- "%s: IOC %d request(%s) filename=%s from rank(%d), size=%ld, offset=%ld FAILED with ret %d",
- __func__, msg->subfile_rank, translate_opcode((io_op_t)msg->tag), sf_context->sf_filename,
- msg->source, msg->header[0], msg->header[1], op_ret);
+ H5_subfiling_log(file_context_id,
+ "%s: IOC %d request(%s) from rank(%d), (%" PRId64 ", %" PRId64 ", %" PRId64
+ ") FAILED with ret %d",
+ __func__, msg->ioc_idx, translate_opcode((io_op_t)msg->tag), msg->source,
+ msg->header[0], msg->header[1], msg->header[2], op_ret);
#endif
q_entry_ptr->wk_ret = op_ret;
@@ -686,15 +684,15 @@ from the thread pool threads...
*-------------------------------------------------------------------------
*/
static int
-ioc_file_queue_write_indep(sf_work_request_t *msg, int subfile_rank, int source, MPI_Comm comm,
- uint32_t counter)
+ioc_file_queue_write_indep(sf_work_request_t *msg, int ioc_idx, int source, MPI_Comm comm, uint32_t counter)
{
subfiling_context_t *sf_context = NULL;
MPI_Status msg_status;
hbool_t send_nack = FALSE;
+ int64_t file_context_id;
int64_t data_size;
int64_t file_offset;
- int64_t file_context_id;
+ int64_t subfile_idx;
int64_t stripe_id;
haddr_t sf_eof;
#ifdef H5FD_IOC_COLLECT_STATS
@@ -714,10 +712,12 @@ ioc_file_queue_write_indep(sf_work_request_t *msg, int subfile_rank, int source,
HDassert(msg);
+ file_context_id = msg->context_id;
+
/* Retrieve the fields of the RPC message for the write operation */
- data_size = msg->header[0];
- file_offset = msg->header[1];
- file_context_id = msg->header[2];
+ data_size = msg->header[0];
+ file_offset = msg->header[1];
+ subfile_idx = msg->header[2];
if (data_size < 0) {
send_nack = TRUE;
@@ -746,7 +746,7 @@ ioc_file_queue_write_indep(sf_work_request_t *msg, int subfile_rank, int source,
#ifdef H5_SUBFILING_DEBUG
H5_subfiling_log(file_context_id,
"[ioc(%d) %s]: msg from %d: datasize=%ld\toffset=%ld, queue_delay = %lf seconds\n",
- subfile_rank, __func__, source, data_size, file_offset, t_queue_delay);
+ ioc_idx, __func__, source, data_size, file_offset, t_queue_delay);
#endif
#endif
@@ -764,12 +764,12 @@ ioc_file_queue_write_indep(sf_work_request_t *msg, int subfile_rank, int source,
* allows us to distinguish between multiple concurrent
* writes from a single rank.
*/
- HDassert(H5FD_IOC_tag_ub_val_ptr && (*H5FD_IOC_tag_ub_val_ptr >= WRITE_TAG_BASE));
- rcv_tag = (int)(counter % (INT_MAX - WRITE_TAG_BASE));
- rcv_tag %= (*H5FD_IOC_tag_ub_val_ptr - WRITE_TAG_BASE);
- rcv_tag += WRITE_TAG_BASE;
+ HDassert(H5FD_IOC_tag_ub_val_ptr && (*H5FD_IOC_tag_ub_val_ptr >= IO_TAG_BASE));
+ rcv_tag = (int)(counter % (INT_MAX - IO_TAG_BASE));
+ rcv_tag %= (*H5FD_IOC_tag_ub_val_ptr - IO_TAG_BASE);
+ rcv_tag += IO_TAG_BASE;
- if (send_ack_to_client(rcv_tag, source, subfile_rank, WRITE_INDEP_ACK, comm) < 0)
+ if (send_ack_to_client(rcv_tag, source, ioc_idx, WRITE_INDEP_ACK, comm) < 0)
H5_SUBFILING_GOTO_ERROR(H5E_IO, H5E_WRITEERROR, -1, "couldn't send ACK to client");
/* Receive data from client */
@@ -794,13 +794,14 @@ ioc_file_queue_write_indep(sf_work_request_t *msg, int subfile_rank, int source,
t_start = t_end;
#ifdef H5_SUBFILING_DEBUG
- H5_subfiling_log(file_context_id, "[ioc(%d) %s] MPI_Recv(%ld bytes, from = %d) status = %d\n",
- subfile_rank, __func__, data_size, source, mpi_code);
+ H5_subfiling_log(file_context_id, "[ioc(%d) %s] MPI_Recv(%ld bytes, from = %d) status = %d\n", ioc_idx,
+ __func__, data_size, source, mpi_code);
#endif
#endif
- sf_fid = sf_context->sf_fid;
+ HDassert(subfile_idx < sf_context->sf_num_fids);
+ sf_fid = sf_context->sf_fids[subfile_idx];
#ifdef H5_SUBFILING_DEBUG
if (sf_fid < 0)
@@ -810,7 +811,7 @@ ioc_file_queue_write_indep(sf_work_request_t *msg, int subfile_rank, int source,
if (sf_fid >= 0) {
/* Actually write data received from client into subfile */
- if ((write_ret = ioc_file_write_data(sf_fid, file_offset, recv_buf, data_size, subfile_rank)) < 0)
+ if ((write_ret = ioc_file_write_data(sf_fid, file_offset, recv_buf, data_size, ioc_idx)) < 0)
H5_SUBFILING_GOTO_ERROR(H5E_IO, H5E_WRITEERROR, -1,
"write function(FID=%d, Source=%d) returned an error (%d)", sf_fid,
source, write_ret);
@@ -834,10 +835,17 @@ ioc_file_queue_write_indep(sf_work_request_t *msg, int subfile_rank, int source,
H5FD_ioc_end_thread_exclusive();
+ /*
+ * Send a message back to the client that the I/O call has
+ * completed and it is safe to return from the write call
+ */
+ if (MPI_SUCCESS != (mpi_code = MPI_Send(&rcv_tag, 1, MPI_INT, source, WRITE_DATA_DONE, comm)))
+ H5_SUBFILING_MPI_GOTO_ERROR(-1, "MPI_Send failed", mpi_code);
+
done:
if (send_nack) {
/* Send NACK back to client so client can handle failure gracefully */
- if (send_nack_to_client(source, subfile_rank, WRITE_INDEP_ACK, comm) < 0)
+ if (send_nack_to_client(source, ioc_idx, WRITE_INDEP_ACK, comm) < 0)
H5_SUBFILING_DONE_ERROR(H5E_IO, H5E_WRITEERROR, -1, "couldn't send NACK to client");
}
@@ -867,13 +875,16 @@ done:
*-------------------------------------------------------------------------
*/
static int
-ioc_file_queue_read_indep(sf_work_request_t *msg, int subfile_rank, int source, MPI_Comm comm)
+ioc_file_queue_read_indep(sf_work_request_t *msg, int ioc_idx, int source, MPI_Comm comm, uint32_t counter)
{
subfiling_context_t *sf_context = NULL;
hbool_t send_empty_buf = TRUE;
+ hbool_t send_nack = FALSE;
+ hbool_t need_data_tag = FALSE;
+ int64_t file_context_id;
int64_t data_size;
int64_t file_offset;
- int64_t file_context_id;
+ int64_t subfile_idx;
#ifdef H5FD_IOC_COLLECT_STATS
double t_start;
double t_end;
@@ -881,6 +892,7 @@ ioc_file_queue_read_indep(sf_work_request_t *msg, int subfile_rank, int source,
double t_queue_delay;
#endif
char *send_buf = NULL;
+ int send_tag;
int sf_fid;
int read_ret;
int mpi_code;
@@ -888,17 +900,37 @@ ioc_file_queue_read_indep(sf_work_request_t *msg, int subfile_rank, int source,
HDassert(msg);
- /* Retrieve the fields of the RPC message for the read operation */
- data_size = msg->header[0];
- file_offset = msg->header[1];
- file_context_id = msg->header[2];
-
- if (data_size < 0)
- H5_SUBFILING_GOTO_ERROR(H5E_IO, H5E_BADVALUE, -1, "invalid data size for read");
+ file_context_id = msg->context_id;
sf_context = H5_get_subfiling_object(file_context_id);
HDassert(sf_context);
+ /*
+ * If we are using 1 subfile per IOC, we can optimize reads
+ * a little since each read will go to a separate IOC and we
+ * won't be in danger of data being received in an
+ * unpredictable order. However, if some IOCs own more than
+ * 1 subfile, we need to associate each read with a unique
+ * message tag to make sure the data is received in the
+ * correct order.
+ */
+ need_data_tag = sf_context->sf_num_subfiles != sf_context->topology->n_io_concentrators;
+ if (!need_data_tag)
+ send_tag = READ_INDEP_DATA;
+
+ /* Retrieve the fields of the RPC message for the read operation */
+ data_size = msg->header[0];
+ file_offset = msg->header[1];
+ subfile_idx = msg->header[2];
+
+ if (data_size < 0) {
+ if (need_data_tag) {
+ send_nack = TRUE;
+ send_empty_buf = FALSE;
+ }
+ H5_SUBFILING_GOTO_ERROR(H5E_IO, H5E_BADVALUE, -1, "invalid data size for read");
+ }
+
/* Flag that we've attempted to read data from the file */
sf_context->sf_read_count++;
@@ -911,22 +943,48 @@ ioc_file_queue_read_indep(sf_work_request_t *msg, int subfile_rank, int source,
#ifdef H5_SUBFILING_DEBUG
H5_subfiling_log(file_context_id,
- "[ioc(%d) %s] msg from %d: datasize=%ld\toffset=%ld queue_delay=%lf seconds\n",
- subfile_rank, __func__, source, data_size, file_offset, t_queue_delay);
+ "[ioc(%d) %s] msg from %d: datasize=%ld\toffset=%ld queue_delay=%lf seconds\n", ioc_idx,
+ __func__, source, data_size, file_offset, t_queue_delay);
#endif
#endif
/* Allocate space to send data read from file to client */
- if (NULL == (send_buf = HDmalloc((size_t)data_size)))
+ if (NULL == (send_buf = HDmalloc((size_t)data_size))) {
+ if (need_data_tag) {
+ send_nack = TRUE;
+ send_empty_buf = FALSE;
+ }
H5_SUBFILING_GOTO_ERROR(H5E_RESOURCE, H5E_CANTALLOC, -1, "couldn't allocate send buffer for data");
+ }
+
+ if (need_data_tag) {
+ /*
+ * Calculate message tag for the client to use for receiving
+ * data, then send an ACK message to the client with the
+ * calculated message tag. This calculated message tag
+ * allows us to distinguish between multiple concurrent
+ * reads from a single rank, which can happen when a rank
+ * owns multiple subfiles.
+ */
+ HDassert(H5FD_IOC_tag_ub_val_ptr && (*H5FD_IOC_tag_ub_val_ptr >= IO_TAG_BASE));
+ send_tag = (int)(counter % (INT_MAX - IO_TAG_BASE));
+ send_tag %= (*H5FD_IOC_tag_ub_val_ptr - IO_TAG_BASE);
+ send_tag += IO_TAG_BASE;
+
+ if (send_ack_to_client(send_tag, source, ioc_idx, READ_INDEP_ACK, comm) < 0) {
+ send_empty_buf = FALSE;
+ H5_SUBFILING_GOTO_ERROR(H5E_IO, H5E_READERROR, -1, "couldn't send ACK to client");
+ }
+ }
- sf_fid = sf_context->sf_fid;
+ /* Read data from the subfile */
+ HDassert(subfile_idx < sf_context->sf_num_fids);
+ sf_fid = sf_context->sf_fids[subfile_idx];
if (sf_fid < 0)
H5_SUBFILING_GOTO_ERROR(H5E_IO, H5E_BADVALUE, -1, "subfile file descriptor %d is invalid", sf_fid);
- /* Read data from the subfile */
- if ((read_ret = ioc_file_read_data(sf_fid, file_offset, send_buf, data_size, subfile_rank)) < 0) {
+ if ((read_ret = ioc_file_read_data(sf_fid, file_offset, send_buf, data_size, ioc_idx)) < 0) {
H5_SUBFILING_GOTO_ERROR(H5E_IO, H5E_READERROR, read_ret,
"read function(FID=%d, Source=%d) returned an error (%d)", sf_fid, source,
read_ret);
@@ -936,8 +994,7 @@ ioc_file_queue_read_indep(sf_work_request_t *msg, int subfile_rank, int source,
/* Send read data to the client */
H5_CHECK_OVERFLOW(data_size, int64_t, int);
- if (MPI_SUCCESS !=
- (mpi_code = MPI_Send(send_buf, (int)data_size, MPI_BYTE, source, READ_INDEP_DATA, comm)))
+ if (MPI_SUCCESS != (mpi_code = MPI_Send(send_buf, (int)data_size, MPI_BYTE, source, send_tag, comm)))
H5_SUBFILING_MPI_GOTO_ERROR(-1, "MPI_Send failed", mpi_code);
#ifdef H5FD_IOC_COLLECT_STATS
@@ -947,19 +1004,24 @@ ioc_file_queue_read_indep(sf_work_request_t *msg, int subfile_rank, int source,
sf_queue_delay_time += t_queue_delay;
#ifdef H5_SUBFILING_DEBUG
- H5_subfiling_log(sf_context->sf_context_id, "[ioc(%d)] MPI_Send to source(%d) completed\n", subfile_rank,
+ H5_subfiling_log(sf_context->sf_context_id, "[ioc(%d)] MPI_Send to source(%d) completed\n", ioc_idx,
source);
#endif
#endif
done:
+ if (need_data_tag && send_nack) {
+ /* Send NACK back to client so client can handle failure gracefully */
+ if (send_nack_to_client(source, ioc_idx, READ_INDEP_ACK, comm) < 0)
+ H5_SUBFILING_DONE_ERROR(H5E_IO, H5E_READERROR, -1, "couldn't send NACK to client");
+ }
if (send_empty_buf) {
/*
* Send an empty message back to client on failure. The client will
* likely get a message truncation error, but at least shouldn't hang.
*/
- if (MPI_SUCCESS != (mpi_code = MPI_Send(NULL, 0, MPI_BYTE, source, READ_INDEP_DATA, comm)))
+ if (MPI_SUCCESS != (mpi_code = MPI_Send(NULL, 0, MPI_BYTE, source, send_tag, comm)))
H5_SUBFILING_MPI_DONE_ERROR(-1, "MPI_Send failed", mpi_code);
}
@@ -978,7 +1040,7 @@ being thread safe.
*/
static int
-ioc_file_write_data(int fd, int64_t file_offset, void *data_buffer, int64_t data_size, int subfile_rank)
+ioc_file_write_data(int fd, int64_t file_offset, void *data_buffer, int64_t data_size, int ioc_idx)
{
ssize_t bytes_remaining = (ssize_t)data_size;
ssize_t bytes_written = 0;
@@ -986,7 +1048,7 @@ ioc_file_write_data(int fd, int64_t file_offset, void *data_buffer, int64_t data
int ret_value = 0;
#ifndef H5FD_IOC_DEBUG
- (void)subfile_rank;
+ (void)ioc_idx;
#endif
HDcompile_assert(H5_SIZEOF_OFF_T == sizeof(file_offset));
@@ -1000,7 +1062,7 @@ ioc_file_write_data(int fd, int64_t file_offset, void *data_buffer, int64_t data
bytes_remaining -= bytes_written;
#ifdef H5FD_IOC_DEBUG
- HDprintf("[ioc(%d) %s]: wrote %ld bytes, remaining=%ld, file_offset=%" PRId64 "\n", subfile_rank,
+ HDprintf("[ioc(%d) %s]: wrote %ld bytes, remaining=%ld, file_offset=%" PRId64 "\n", ioc_idx,
__func__, bytes_written, bytes_remaining, file_offset);
#endif
@@ -1024,7 +1086,7 @@ done:
} /* end ioc_file_write_data() */
static int
-ioc_file_read_data(int fd, int64_t file_offset, void *data_buffer, int64_t data_size, int subfile_rank)
+ioc_file_read_data(int fd, int64_t file_offset, void *data_buffer, int64_t data_size, int ioc_idx)
{
useconds_t delay = 100;
ssize_t bytes_remaining = (ssize_t)data_size;
@@ -1034,7 +1096,7 @@ ioc_file_read_data(int fd, int64_t file_offset, void *data_buffer, int64_t data_
int ret_value = 0;
#ifndef H5FD_IOC_DEBUG
- (void)subfile_rank;
+ (void)ioc_idx;
#endif
HDcompile_assert(H5_SIZEOF_OFF_T == sizeof(file_offset));
@@ -1052,7 +1114,7 @@ ioc_file_read_data(int fd, int64_t file_offset, void *data_buffer, int64_t data_
bytes_remaining -= bytes_read;
#ifdef H5FD_IOC_DEBUG
- HDprintf("[ioc(%d) %s]: read %ld bytes, remaining=%ld, file_offset=%" PRId64 "\n", subfile_rank,
+ HDprintf("[ioc(%d) %s]: read %ld bytes, remaining=%ld, file_offset=%" PRId64 "\n", ioc_idx,
__func__, bytes_read, bytes_remaining, file_offset);
#endif
@@ -1069,8 +1131,8 @@ ioc_file_read_data(int fd, int64_t file_offset, void *data_buffer, int64_t data_
else {
if (retries == 0) {
#ifdef H5FD_IOC_DEBUG
- HDprintf("[ioc(%d) %s]: TIMEOUT: file_offset=%" PRId64 ", data_size=%ld\n", subfile_rank,
- __func__, file_offset, data_size);
+ HDprintf("[ioc(%d) %s]: TIMEOUT: file_offset=%" PRId64 ", data_size=%ld\n", ioc_idx, __func__,
+ file_offset, data_size);
#endif
H5_SUBFILING_SYS_GOTO_ERROR(H5E_IO, H5E_READERROR, -1, "HDpread failed");
@@ -1087,19 +1149,40 @@ done:
} /* end ioc_file_read_data() */
static int
-ioc_file_truncate(int fd, int64_t length, int subfile_rank)
+ioc_file_truncate(sf_work_request_t *msg)
{
- int ret_value = 0;
+ subfiling_context_t *sf_context = NULL;
+ int64_t file_context_id;
+ int64_t length;
+ int64_t subfile_idx;
+ int fd;
+ int ioc_idx;
+ int ret_value = 0;
+
+ HDassert(msg);
+
+ file_context_id = msg->context_id;
+ ioc_idx = msg->ioc_idx;
+
+ length = msg->header[0];
+ subfile_idx = msg->header[1];
#ifndef H5FD_IOC_DEBUG
- (void)subfile_rank;
+ (void)ioc_idx;
#endif
+ if (NULL == (sf_context = H5_get_subfiling_object(file_context_id)))
+ H5_SUBFILING_GOTO_ERROR(H5E_FILE, H5E_CANTGET, -1, "couldn't retrieve subfiling context");
+
+ HDassert(subfile_idx < sf_context->sf_num_fids);
+
+ fd = sf_context->sf_fids[subfile_idx];
+
if (HDftruncate(fd, (off_t)length) != 0)
H5_SUBFILING_SYS_GOTO_ERROR(H5E_FILE, H5E_SEEKERROR, -1, "HDftruncate failed");
#ifdef H5FD_IOC_DEBUG
- HDprintf("[ioc(%d) %s]: truncated subfile to %lld bytes. ret = %d\n", subfile_rank, __func__,
+ HDprintf("[ioc(%d) %s]: truncated subfile to %lld bytes. ret = %d\n", ioc_idx, __func__,
(long long)length, errno);
HDfflush(stdout);
#endif
@@ -1111,7 +1194,7 @@ done:
/*-------------------------------------------------------------------------
* Function: ioc_file_report_eof
*
- * Purpose: Determine the target sub-file's eof and report this value
+ * Purpose: Determine the target subfile's eof and report this value
* to the requesting rank.
*
* Notes: This function will have to be reworked once we solve
@@ -1131,40 +1214,48 @@ done:
*/
static int
-ioc_file_report_eof(sf_work_request_t *msg, int subfile_rank, int source, MPI_Comm comm)
+ioc_file_report_eof(sf_work_request_t *msg, MPI_Comm comm)
{
subfiling_context_t *sf_context = NULL;
h5_stat_t sb;
int64_t eof_req_reply[3];
int64_t file_context_id;
+ int64_t subfile_idx;
int fd;
+ int source;
+ int ioc_idx;
int mpi_code;
int ret_value = 0;
HDassert(msg);
- /* first get the EOF of the target file. */
+ file_context_id = msg->context_id;
+ source = msg->source;
+ ioc_idx = msg->ioc_idx;
- file_context_id = msg->header[2];
+ subfile_idx = msg->header[0];
if (NULL == (sf_context = H5_get_subfiling_object(file_context_id)))
H5_SUBFILING_GOTO_ERROR(H5E_FILE, H5E_CANTGET, -1, "couldn't retrieve subfiling context");
- fd = sf_context->sf_fid;
+ HDassert(subfile_idx < sf_context->sf_num_fids);
+
+ fd = sf_context->sf_fids[subfile_idx];
if (HDfstat(fd, &sb) < 0)
H5_SUBFILING_SYS_GOTO_ERROR(H5E_FILE, H5E_SYSERRSTR, -1, "HDfstat failed");
- eof_req_reply[0] = (int64_t)subfile_rank;
+ eof_req_reply[0] = (int64_t)ioc_idx;
eof_req_reply[1] = (int64_t)(sb.st_size);
- eof_req_reply[2] = 0; /* not used */
+ eof_req_reply[2] = subfile_idx;
#ifdef H5_SUBFILING_DEBUG
H5_subfiling_log(file_context_id, "%s: reporting file EOF as %" PRId64 ".", __func__, eof_req_reply[1]);
#endif
/* return the subfile EOF to the querying rank */
- if (MPI_SUCCESS != (mpi_code = MPI_Send(eof_req_reply, 3, MPI_INT64_T, source, GET_EOF_COMPLETED, comm)))
+ if (MPI_SUCCESS !=
+ (mpi_code = MPI_Send(eof_req_reply, 1, H5_subfiling_rpc_msg_type, source, GET_EOF_COMPLETED, comm)))
H5_SUBFILING_MPI_GOTO_ERROR(-1, "MPI_Send", mpi_code);
done:
@@ -1272,12 +1363,13 @@ ioc_io_queue_add_entry(ioc_data_t *ioc_data, sf_work_request_t *wk_req_ptr)
atomic_fetch_add(&ioc_data->sf_io_ops_pending, 1);
#ifdef H5_SUBFILING_DEBUG
- H5_subfiling_log(wk_req_ptr->context_id,
- "%s: request %d queued. op = %d, offset/len = %lld/%lld, q-ed/disp/ops_pend = %d/%d/%d.",
- __func__, entry_ptr->counter, (entry_ptr->wk_req.tag),
- (long long)(entry_ptr->wk_req.header[1]), (long long)(entry_ptr->wk_req.header[0]),
- ioc_data->io_queue.num_pending, ioc_data->io_queue.num_in_progress,
- atomic_load(&ioc_data->sf_io_ops_pending));
+ H5_subfiling_log(
+ wk_req_ptr->context_id,
+ "%s: request %d queued. op = %d, req = (%lld, %lld, %lld), q-ed/disp/ops_pend = %d/%d/%d.", __func__,
+ entry_ptr->counter, (entry_ptr->wk_req.tag), (long long)(entry_ptr->wk_req.header[0]),
+ (long long)(entry_ptr->wk_req.header[1]), (long long)(entry_ptr->wk_req.header[2]),
+ ioc_data->io_queue.num_pending, ioc_data->io_queue.num_in_progress,
+ atomic_load(&ioc_data->sf_io_ops_pending));
#endif
HDassert(ioc_data->io_queue.num_pending + ioc_data->io_queue.num_in_progress == ioc_data->io_queue.q_len);
@@ -1478,14 +1570,14 @@ ioc_io_queue_dispatch_eligible_entries(ioc_data_t *ioc_data, hbool_t try_lock)
entry_ptr->thread_wk.args = entry_ptr;
#ifdef H5_SUBFILING_DEBUG
- H5_subfiling_log(entry_ptr->wk_req.context_id,
- "%s: request %d dispatched. op = %d, offset/len = %lld/%lld, "
- "q-ed/disp/ops_pend = %d/%d/%d.",
- __func__, entry_ptr->counter, (entry_ptr->wk_req.tag),
- (long long)(entry_ptr->wk_req.header[1]),
- (long long)(entry_ptr->wk_req.header[0]), ioc_data->io_queue.num_pending,
- ioc_data->io_queue.num_in_progress,
- atomic_load(&ioc_data->sf_io_ops_pending));
+ H5_subfiling_log(
+ entry_ptr->wk_req.context_id,
+ "%s: request %d dispatched. op = %d, req = (%lld, %lld, %lld), "
+ "q-ed/disp/ops_pend = %d/%d/%d.",
+ __func__, entry_ptr->counter, (entry_ptr->wk_req.tag),
+ (long long)(entry_ptr->wk_req.header[0]), (long long)(entry_ptr->wk_req.header[1]),
+ (long long)(entry_ptr->wk_req.header[2]), ioc_data->io_queue.num_pending,
+ ioc_data->io_queue.num_in_progress, atomic_load(&ioc_data->sf_io_ops_pending));
#endif
#ifdef H5FD_IOC_COLLECT_STATS
@@ -1564,12 +1656,12 @@ ioc_io_queue_complete_entry(ioc_data_t *ioc_data, ioc_io_queue_entry_t *entry_pt
#ifdef H5_SUBFILING_DEBUG
H5_subfiling_log(entry_ptr->wk_req.context_id,
- "%s: request %d completed with ret %d. op = %d, offset/len = %lld/%lld, "
+ "%s: request %d completed with ret %d. op = %d, req = (%lld, %lld, %lld), "
"q-ed/disp/ops_pend = %d/%d/%d.",
__func__, entry_ptr->counter, entry_ptr->wk_ret, (entry_ptr->wk_req.tag),
- (long long)(entry_ptr->wk_req.header[1]), (long long)(entry_ptr->wk_req.header[0]),
- ioc_data->io_queue.num_pending, ioc_data->io_queue.num_in_progress,
- atomic_load(&ioc_data->sf_io_ops_pending));
+ (long long)(entry_ptr->wk_req.header[0]), (long long)(entry_ptr->wk_req.header[1]),
+ (long long)(entry_ptr->wk_req.header[2]), ioc_data->io_queue.num_pending,
+ ioc_data->io_queue.num_in_progress, atomic_load(&ioc_data->sf_io_ops_pending));
/*
* If this I/O request is a truncate or "get eof" op, make sure