From b5120771b1076ff32421b5d3cf71f76a25b2f8c7 Mon Sep 17 00:00:00 2001 From: rjzamora Date: Fri, 14 Dec 2018 17:18:32 -0600 Subject: First commit of ccio branch to add changes for the topology-aware MPIO VFD. The topology-aware VFD is part of the custom-collective I/O (CCIO) verison of the MPIO VFD. All CCIO modifications rely on a flattened-buffer (flatbuf) representation of the data in file and memory space. The CCIO options are enabled by Env Vars, and work by rerouting the usual collective-IO calls into new select_read and select_write VFD callback functions. The flatbuf selections are performed using exisiting HDF5 functionalities for dataspace selections. --- src/H5Dmpio.c | 103 +- src/H5FDcore.c | 2 + src/H5FDdirect.c | 2 + src/H5FDfamily.c | 2 + src/H5FDint.c | 75 + src/H5FDlog.c | 2 + src/H5FDmpio.c | 5109 ++++++++++++++++++++++++++++++++++++++++++++++- src/H5FDmpio_topology.h | 922 +++++++++ src/H5FDmulti.c | 2 + src/H5FDprivate.h | 3 +- src/H5FDpublic.h | 30 +- src/H5FDsec2.c | 2 + src/H5FDstdio.c | 2 + src/H5Fio.c | 128 +- src/H5Fprivate.h | 16 +- src/H5Sall.c | 9 +- src/H5Shyper.c | 29 +- src/H5Smpio.c | 122 +- src/H5Spoint.c | 15 +- src/H5Sprivate.h | 29 +- 20 files changed, 6449 insertions(+), 155 deletions(-) create mode 100644 src/H5FDmpio_topology.h diff --git a/src/H5Dmpio.c b/src/H5Dmpio.c index 2c06800..7aebbbb 100644 --- a/src/H5Dmpio.c +++ b/src/H5Dmpio.c @@ -618,6 +618,10 @@ H5D__contig_collective_read(H5D_io_info_t *io_info, const H5D_type_info_t *type_ H5D_chunk_map_t H5_ATTR_UNUSED *fm) { H5D_mpio_actual_io_mode_t actual_io_mode = H5D_MPIO_CONTIGUOUS_COLLECTIVE; + char *do_custom_agg; /* CCIO-read env variable */ + hid_t file_space_hid; /* file space for CCIO */ + hid_t mem_space_hid; /* memory space for CCIO */ + const H5D_contig_storage_t *store_contig; /* Storage structure for CCIO */ herr_t ret_value = SUCCEED; /* Return value */ FUNC_ENTER_PACKAGE @@ -625,9 +629,32 @@ H5D__contig_collective_read(H5D_io_info_t *io_info, const H5D_type_info_t *type_ /* Sanity check */ HDassert(H5FD_MPIO == H5F_DRIVER_ID(io_info->dset->oloc.file)); - /* Call generic internal collective I/O routine */ - if(H5D__inter_collective_io(io_info, type_info, file_space, mem_space) < 0) - HGOTO_ERROR(H5E_IO, H5E_READERROR, FAIL, "couldn't finish shared collective MPI-IO") + /* Check for CCIO-read option */ + do_custom_agg = HDgetenv("HDF5_CCIO_RD"); + if (do_custom_agg && (strcmp(do_custom_agg,"yes") == 0)) { + + /* Call select_read (rather than `inter_collective` if using CCIO) */ + file_space_hid = H5I_register(H5I_DATASPACE, file_space, TRUE); + mem_space_hid = H5I_register(H5I_DATASPACE, mem_space, TRUE); + + /* Contiguous storage info for this I/O operation: */ + store_contig = &(io_info->store->contig); + + if(H5F_select_read(io_info->dset->oloc.file, H5FD_MEM_DRAW, file_space_hid, mem_space_hid, (size_t)type_info->src_type_size, store_contig->dset_addr, io_info->u.rbuf) < 0) + HGOTO_ERROR(H5E_IO, H5E_READERROR, FAIL, "can't finish collective parallel read") + + if(NULL != ((H5S_t *)H5I_object_verify(file_space_hid, H5I_DATASPACE))) + H5Sclose(file_space_hid); + if(NULL != ((H5S_t *)H5I_object_verify(mem_space_hid, H5I_DATASPACE))) + H5Sclose(mem_space_hid); + + } else { + + /* Call generic internal collective I/O routine */ + if(H5D__inter_collective_io(io_info, type_info, file_space, mem_space) < 0) + HGOTO_ERROR(H5E_IO, H5E_READERROR, FAIL, "couldn't finish shared collective MPI-IO") + + } /* Set the actual I/O mode property. internal_collective_io will not break to * independent I/O, so we set it here. @@ -658,6 +685,10 @@ H5D__contig_collective_write(H5D_io_info_t *io_info, const H5D_type_info_t *type H5D_chunk_map_t H5_ATTR_UNUSED *fm) { H5D_mpio_actual_io_mode_t actual_io_mode = H5D_MPIO_CONTIGUOUS_COLLECTIVE; + char *do_custom_agg; /* CCIO-write env variable */ + hid_t file_space_hid; /* file space for CCIO */ + hid_t mem_space_hid; /* memory space for CCIO */ + const H5D_contig_storage_t *store_contig; /* Storage structure for CCIO */ herr_t ret_value = SUCCEED; /* Return value */ FUNC_ENTER_PACKAGE @@ -665,9 +696,32 @@ H5D__contig_collective_write(H5D_io_info_t *io_info, const H5D_type_info_t *type /* Sanity check */ HDassert(H5FD_MPIO == H5F_DRIVER_ID(io_info->dset->oloc.file)); - /* Call generic internal collective I/O routine */ - if(H5D__inter_collective_io(io_info, type_info, file_space, mem_space) < 0) - HGOTO_ERROR(H5E_IO, H5E_WRITEERROR, FAIL, "couldn't finish shared collective MPI-IO") + /* Check for CCIO-write option */ + do_custom_agg = HDgetenv("HDF5_CCIO_WR"); + if (do_custom_agg && (strcmp(do_custom_agg,"yes") == 0)) { + + /* Call select_write (rather than `inter_collective` if using CCIO) */ + hid_t file_space_hid = H5I_register(H5I_DATASPACE, file_space,TRUE); + hid_t mem_space_hid = H5I_register(H5I_DATASPACE, mem_space,TRUE); + + /* Contiguous storage info for this I/O operation: */ + store_contig = &(io_info->store->contig); + + if(H5F_select_write(io_info->dset->oloc.file, H5FD_MEM_DRAW, file_space_hid, mem_space_hid, (size_t)type_info->src_type_size, store_contig->dset_addr, io_info->u.wbuf) < 0) + HGOTO_ERROR(H5E_IO, H5E_WRITEERROR, FAIL, "can't finish collective parallel write") + + if(NULL != ((H5S_t *)H5I_object_verify(file_space_hid, H5I_DATASPACE))) + H5Sclose(file_space_hid); + if(NULL != ((H5S_t *)H5I_object_verify(mem_space_hid, H5I_DATASPACE))) + H5Sclose(mem_space_hid); + + } else { + + /* Call generic internal collective I/O routine */ + if(H5D__inter_collective_io(io_info, type_info, file_space, mem_space) < 0) + HGOTO_ERROR(H5E_IO, H5E_WRITEERROR, FAIL, "couldn't finish shared collective MPI-IO") + + } /* Set the actual I/O mode property. internal_collective_io will not break to * independent I/O, so we set it here. @@ -743,7 +797,7 @@ H5D__chunk_collective_io(H5D_io_info_t *io_info, const H5D_type_info_t *type_inf io_option = H5D_ONE_LINK_CHUNK_IO; /*no opt*/ /* direct request to multi-chunk-io */ else if(H5FD_MPIO_CHUNK_MULTI_IO == chunk_opt_mode) - io_option = H5D_MULTI_CHUNK_IO; + io_option = H5D_MULTI_CHUNK_IO; /* via default path. branch by num threshold */ else { unsigned one_link_chunk_io_threshold; /* Threshold to use single collective I/O for all chunks */ @@ -1054,9 +1108,9 @@ if(H5DEBUG(D)) /* Obtain MPI derived datatype from all individual chunks */ for(u = 0; u < num_chunk; u++) { - hsize_t *permute_map = NULL; /* array that holds the mapping from the old, - out-of-order displacements to the in-order - displacements of the MPI datatypes of the + hsize_t *permute_map = NULL; /* array that holds the mapping from the old, + out-of-order displacements to the in-order + displacements of the MPI datatypes of the point selection of the file space */ hbool_t is_permuted = FALSE; @@ -1066,8 +1120,8 @@ if(H5DEBUG(D)) * where it will be freed. */ if(H5S_mpio_space_type(chunk_addr_info_array[u].chunk_info.fspace, - type_info->src_type_size, - &chunk_ftype[u], /* OUT: datatype created */ + type_info->src_type_size, + &chunk_ftype[u], /* OUT: datatype created */ &chunk_mpi_file_counts[u], /* OUT */ &(chunk_mft_is_derived_array[u]), /* OUT */ TRUE, /* this is a file space, @@ -1085,9 +1139,9 @@ if(H5DEBUG(D)) if(is_permuted) HDassert(permute_map); if(H5S_mpio_space_type(chunk_addr_info_array[u].chunk_info.mspace, - type_info->dst_type_size, &chunk_mtype[u], - &chunk_mpi_mem_counts[u], - &(chunk_mbt_is_derived_array[u]), + type_info->dst_type_size, &chunk_mtype[u], + &chunk_mpi_mem_counts[u], + &(chunk_mbt_is_derived_array[u]), FALSE, /* this is a memory space, so if the file space is not @@ -1947,9 +2001,9 @@ H5D__inter_collective_io(H5D_io_info_t *io_info, const H5D_type_info_t *type_inf if((file_space != NULL) && (mem_space != NULL)) { int mpi_file_count; /* Number of file "objects" to transfer */ - hsize_t *permute_map = NULL; /* array that holds the mapping from the old, - out-of-order displacements to the in-order - displacements of the MPI datatypes of the + hsize_t *permute_map = NULL; /* array that holds the mapping from the old, + out-of-order displacements to the in-order + displacements of the MPI datatypes of the point selection of the file space */ hbool_t is_permuted = FALSE; @@ -1958,8 +2012,8 @@ H5D__inter_collective_io(H5D_io_info_t *io_info, const H5D_type_info_t *type_inf * and will be fed into the next call to H5S_mpio_space_type * where it will be freed. */ - if(H5S_mpio_space_type(file_space, type_info->src_type_size, - &mpi_file_type, &mpi_file_count, &mft_is_derived, /* OUT: datatype created */ + if(H5S_mpio_space_type(file_space, type_info->src_type_size, + &mpi_file_type, &mpi_file_count, &mft_is_derived, /* OUT: datatype created */ TRUE, /* this is a file space, so permute the datatype if the point selection is out of @@ -1968,13 +2022,13 @@ H5D__inter_collective_io(H5D_io_info_t *io_info, const H5D_type_info_t *type_inf the permutation of points selected in case they are out of - order */ + order */ &is_permuted /* OUT */) < 0) HGOTO_ERROR(H5E_DATASPACE, H5E_BADTYPE, FAIL, "couldn't create MPI file type") /* Sanity check */ if(is_permuted) HDassert(permute_map); - if(H5S_mpio_space_type(mem_space, type_info->src_type_size, + if(H5S_mpio_space_type(mem_space, type_info->src_type_size, &mpi_buf_type, &mpi_buf_count, &mbt_is_derived, /* OUT: datatype created */ FALSE, /* this is a memory space, so if the file space is not @@ -1986,7 +2040,7 @@ H5D__inter_collective_io(H5D_io_info_t *io_info, const H5D_type_info_t *type_inf generated by the file_space selection and applied to the - memory selection */, + memory selection */, &is_permuted /* IN */) < 0) HGOTO_ERROR(H5E_DATASPACE, H5E_BADTYPE, FAIL, "couldn't create MPI buffer type") /* Sanity check */ @@ -2556,7 +2610,7 @@ H5D__construct_filtered_io_info_list(const H5D_io_info_t *io_info, const H5D_typ local_info_array[i].num_writers = 0; local_info_array[i].owners.original_owner = local_info_array[i].owners.new_owner = mpi_rank; local_info_array[i].buf = NULL; - + local_info_array[i].async_info.num_receive_requests = 0; local_info_array[i].async_info.receive_buffer_array = NULL; local_info_array[i].async_info.receive_requests_array = NULL; @@ -3246,4 +3300,3 @@ done: FUNC_LEAVE_NOAPI(ret_value) } /* end H5D__filtered_collective_chunk_entry_io() */ #endif /* H5_HAVE_PARALLEL */ - diff --git a/src/H5FDcore.c b/src/H5FDcore.c index 2ab04dc..ef346f7 100644 --- a/src/H5FDcore.c +++ b/src/H5FDcore.c @@ -170,6 +170,8 @@ static const H5FD_class_t H5FD_core_g = { H5FD__core_get_handle, /* get_handle */ H5FD__core_read, /* read */ H5FD__core_write, /* write */ + NULL, /* select_read */ + NULL, /* select_write */ H5FD__core_flush, /* flush */ H5FD__core_truncate, /* truncate */ H5FD_core_lock, /* lock */ diff --git a/src/H5FDdirect.c b/src/H5FDdirect.c index 906ec28..b39acf6 100644 --- a/src/H5FDdirect.c +++ b/src/H5FDdirect.c @@ -167,6 +167,8 @@ static const H5FD_class_t H5FD_direct_g = { H5FD_direct_get_handle, /*get_handle */ H5FD_direct_read, /*read */ H5FD_direct_write, /*write */ + NULL, /* select_read */ + NULL, /* select_write */ NULL, /*flush */ H5FD_direct_truncate, /*truncate */ H5FD_direct_lock, /*lock */ diff --git a/src/H5FDfamily.c b/src/H5FDfamily.c index e52a71a..bd15cd8 100644 --- a/src/H5FDfamily.c +++ b/src/H5FDfamily.c @@ -135,6 +135,8 @@ static const H5FD_class_t H5FD_family_g = { H5FD_family_get_handle, /*get_handle */ H5FD_family_read, /*read */ H5FD_family_write, /*write */ + NULL, /*select_read */ + NULL, /*select_write */ H5FD_family_flush, /*flush */ H5FD_family_truncate, /*truncate */ H5FD_family_lock, /*lock */ diff --git a/src/H5FDint.c b/src/H5FDint.c index ea8c4d8..22292b4 100644 --- a/src/H5FDint.c +++ b/src/H5FDint.c @@ -252,6 +252,81 @@ done: FUNC_LEAVE_NOAPI(ret_value) } /* end H5FD_write() */ +/*------------------------------------------------------------------------- + * Function: H5FD_select_read + * + * Purpose: Private version of H5FDselect_read() + * + * Return: Success: Non-negative + * Failure: Negative + * + * Programmer: Quincey Koziol + * Saturday, November 4, 2017 + * + * Rick Zamora (Revised October 24, 2018) + * + *------------------------------------------------------------------------- + */ +herr_t +H5FD_select_read(H5FD_t *file, H5FD_mem_t type, + hid_t file_space, hid_t mem_space, size_t elmt_size, haddr_t addr, void *buf/*out*/) +{ + hid_t dxpl_id = H5I_INVALID_HID; /* DXPL for operation */ + herr_t ret_value = SUCCEED; /* Return value */ + FUNC_ENTER_NOAPI(FAIL) + {} + /* Sanity checks */ + HDassert(file); + HDassert(file->cls); + HDassert(file->cls->select_read); + HDassert(buf); + /* Get proper DXPL for I/O */ + dxpl_id = H5CX_get_dxpl(); + /* Dispatch to driver */ + if((file->cls->select_read)(file, type, dxpl_id, file_space, mem_space, elmt_size, addr + file->base_addr, buf) < 0) + HGOTO_ERROR(H5E_VFL, H5E_READERROR, FAIL, "driver read request failed") +done: + FUNC_LEAVE_NOAPI(ret_value) +} /* end H5FD_select_read() */ + +/*------------------------------------------------------------------------- + * Function: H5FD_select_write + * + * Purpose: Private version of H5FDselect_write() + * + * Return: Success: Non-negative + * Failure: Negative + * + * Programmer: Quincey Koziol + * Saturday, November 4, 2017 + * + * Rick Zamora (Revised October 24, 2018) + * + *------------------------------------------------------------------------- + */ +herr_t +H5FD_select_write(H5FD_t *file, H5FD_mem_t type, + hid_t file_space, hid_t mem_space, size_t elmt_size, haddr_t addr, const void *buf) +{ + hid_t dxpl_id; /* DXPL for operation */ + haddr_t eoa = HADDR_UNDEF; /* EOA for file */ + herr_t ret_value = SUCCEED; /* Return value */ + FUNC_ENTER_NOAPI(FAIL) + {} + /* Sanity checks */ + HDassert(file); + HDassert(file->cls); + HDassert(file->cls->select_write); + HDassert(buf); + /* Get proper DXPL for I/O */ + dxpl_id = H5CX_get_dxpl(); + /* Dispatch to driver */ + if((file->cls->select_write)(file, type, dxpl_id, file_space, mem_space, elmt_size, addr + file->base_addr, buf) < 0) + HGOTO_ERROR(H5E_VFL, H5E_WRITEERROR, FAIL, "driver write request failed") +done: + FUNC_LEAVE_NOAPI(ret_value) +} /* end H5FD_select_write() */ + /*------------------------------------------------------------------------- * Function: H5FD_set_eoa diff --git a/src/H5FDlog.c b/src/H5FDlog.c index 3dcd7f5..18a61aa 100644 --- a/src/H5FDlog.c +++ b/src/H5FDlog.c @@ -211,6 +211,8 @@ static const H5FD_class_t H5FD_log_g = { H5FD_log_get_handle, /*get_handle */ H5FD_log_read, /*read */ H5FD_log_write, /*write */ + NULL, /*select_read */ + NULL, /*select_write */ NULL, /*flush */ H5FD_log_truncate, /*truncate */ H5FD_log_lock, /*lock */ diff --git a/src/H5FDmpio.c b/src/H5FDmpio.c index 87f8b6a..e4d7c87 100644 --- a/src/H5FDmpio.c +++ b/src/H5FDmpio.c @@ -21,7 +21,6 @@ #include "H5FDdrvr_module.h" /* This source code file is part of the H5FD driver module */ - #include "H5private.h" /* Generic Functions */ #include "H5CXprivate.h" /* API Contexts */ #include "H5Dprivate.h" /* Dataset functions */ @@ -32,8 +31,206 @@ #include "H5Iprivate.h" /* IDs */ #include "H5MMprivate.h" /* Memory management */ #include "H5Pprivate.h" /* Property lists */ +#include "H5FDmpio_topology.h" /* Topology API */ +#include +//#define topo_timing +//#define onesidedtrace #ifdef H5_HAVE_PARALLEL +#ifdef BGQ +#define inline +#endif + +/* optypes for ADIO Requests */ +#define READ_CA 26 +#define WRITE_CA 27 + +/*******************************/ +/* CCIO Typedefs and Functions */ +/*******************************/ + +typedef struct CustomAgg_FH_Struct_Data *CustomAgg_FH_Data; +typedef long ADIO_Offset_CA; + +/* + * Declaration of i/o thread data structure (bgmpio_pthreadwc) + */ +typedef struct wcThreadFuncData_CA { + MPI_File fh; + int io_kind; + char *buf; + MPI_Offset size; + MPI_Offset offset; + int error_code; + int myrank; +} ThreadFuncData; + +/* + * FSLayout determines how aggregators will be mapped to the file + * LUSTRE -> Aggregators will be mapped to specific LUSTRE-like stripes + * GPFS -> Aggregators will be each assigned to a contiguous file domain + */ +enum FSLayout{LUSTRE, GPFS}; + +/* + * Structure holding important info for CCIO options + * (Must be populated at the MPI_File_open) + */ +typedef struct CustomAgg_FH_Struct_Data { + MPI_Comm comm; + MPI_File fh; + int io_buf_put_amounts; + char *io_buf; + MPI_Win io_buf_window; /* Window over the io_buf to support one-sided aggregation */ + MPI_Win io_buf_put_amounts_window; /* Window over the io_buf_put_amounts */ + int ccio_read; + int ccio_write; + int cb_nodes; + int ppn; /* Only used in topology-aware cb selection if env var is set */ + int pps; /* Only used in topology-aware cb selection if env var is set */ + enum AGGSelect topo_cb_select; + int cb_buffer_size; + int fs_block_count; + int fs_block_size; + int onesided_always_rmw; + int onesided_no_rmw; + int onesided_inform_rmw; + int onesided_write_aggmethod; + int onesided_read_aggmethod; + int *ranklist; + int ranklist_populated; + /* ------- Added for Async IO ------- */ + int async_io_outer; /* Assume H5FD_mpio_ccio_osagg_write calls will only require 1 "inner" round */ + int async_io_inner; /* Assume File-domain aggregation mapping */ + char *io_buf_d; /* Duplicate for "outer" async IO */ + int io_buf_put_amounts_d; /* Duplicate for "outer" async IO */ + MPI_Win io_buf_window_d; /* Duplicate for "outer" async IO */ + MPI_Win io_buf_put_amounts_window_d; /* Duplicate for "outer" async IO */ + MPIO_Request io_Request; + MPIO_Request io_Request_d; + int check_req; + int check_req_d; + int use_dup; + int pthread_io; + /* ---------------------------------- */ + enum FSLayout fslayout; +} CustomAgg_FH_Struct_Data; + +/* + * This data structure holds parameters related to regulating + * the one-sided aggregation algorithm. + */ +typedef struct FS_Block_Parms { + int stripeSize; /* size in bytes of the "striping" unit - a size of 0 indicates to the */ + /* onesided algorithm that we are a non-striping file system */ + ADIO_Offset_CA segmentLen; /* size in bytes of the segment (stripeSize*number of aggs) */ + /* up to the size of the file) */ + int stripesPerAgg; /* the number of stripes to be packed into an agg cb for this segment */ + int segmentIter; /* segment number for the group of stripes currently being packed into */ + /* the agg cb - resets to 0 for each cb flush to the file system */ + int flushCB; /* once we have fully packed the cb on an agg this flags */ + /* tells us to now write to the file */ + ADIO_Offset_CA stripedLastFileOffset; /* since we are now just calling the onesided algorithm */ + /* with the offset range of segment, we still need to */ + /* know the actual last offset of the file. */ + int firstStripedIOCall; /* whether this is the first call in the first segement of the */ + /* onesided algorithm. */ + int lastStripedIOCall; /* whether this is the last call in the last segement of the */ + /* onesided algorithm. */ + int iWasUsedStripingAgg; /* whether this rank was ever a used agg for this striping segement */ + int numStripesUsed; /* the number of stripes packed into an aggregator */ + /* These 2 elements are the offset and lengths in the file corresponding to the actual stripes */ + MPI_Offset *stripeIOoffsets; + int *stripeIOLens; + int amountOfStripedDataExpected; /* used to determine holes in this segment thereby requiring a rmw */ + /* These 2 elements enable ADIOI_OneSidedWriteAggregation to be called multiple times but only */ + /* perform the potientially computationally costly flattening of the source buffer just once */ + hsize_t bufTypeExtent; + /* These three elements track the state of the source buffer advancement through multiple calls */ + /* to ADIOI_OneSidedWriteAggregation */ + ADIO_Offset_CA lastDataTypeExtent; + int lastFlatBufIndice; + ADIO_Offset_CA lastIndiceOffset; +} FS_Block_Parms; + +/* + * This data structure holds the access state of the source buffer for target + * file domains within aggregators corresponding to the target data blocks. + * The validity of the usage of this structure relies on the requirement that + * only 1 aggregator can write to agiven file domain. + */ +typedef struct FDSourceBufferState_CA { + ADIO_Offset_CA indiceOffset; + hsize_t bufTypeExtent; + ADIO_Offset_CA dataTypeExtent; + int flatBufIndice; + ADIO_Offset_CA sourceBufferOffset; +} FDSourceBufferState_CA; + +void calc_file_domains(ADIO_Offset_CA *st_offsets, ADIO_Offset_CA *end_offsets, + int nprocs, int nprocs_for_coll, ADIO_Offset_CA *min_st_offset_ptr, + ADIO_Offset_CA **fd_start_ptr, ADIO_Offset_CA **fd_end_ptr, + ADIO_Offset_CA *fd_size_ptr, ADIO_Offset_CA blksize); + +void H5FD_mpio_ccio_write_one_sided(CustomAgg_FH_Data ca_data, const void *buf, + MPI_Offset mpi_off, H5S_flatbuf_t *memFlatBuf, H5S_flatbuf_t *fileFlatBuf, int *error_code); + +void H5FD_mpio_ccio_read_one_sided(CustomAgg_FH_Data ca_data, void *buf, MPI_Offset mpi_off, + H5S_flatbuf_t *memFlatBuf, H5S_flatbuf_t *fileFlatBuf, int *error_code); + +void H5FD_mpio_ccio_iterate_write(CustomAgg_FH_Data ca_data, const void *buf, + int *fs_block_info, ADIO_Offset_CA *offset_list, ADIO_Offset_CA *len_list, + MPI_Offset mpi_off, int contig_access_count, int currentValidDataIndex, + ADIO_Offset_CA start_offset, ADIO_Offset_CA end_offset, + ADIO_Offset_CA firstFileOffset, ADIO_Offset_CA lastFileOffset, + H5S_flatbuf_t *memFlatBuf, H5S_flatbuf_t *fileFlatBuf, int myrank, int *error_code); + +void H5FD_mpio_ccio_iterate_read(CustomAgg_FH_Data ca_data, void *buf, + int *fs_block_info, ADIO_Offset_CA *offset_list, ADIO_Offset_CA *len_list, + MPI_Offset mpi_off, int contig_access_count, int currentValidDataIndex, + ADIO_Offset_CA start_offset, ADIO_Offset_CA end_offset, + ADIO_Offset_CA firstFileOffset, ADIO_Offset_CA lastFileOffset, + H5S_flatbuf_t *memFlatBuf, H5S_flatbuf_t *fileFlatBuf, int myrank, int *error_code); + +void H5FD_mpio_ccio_osagg_write(CustomAgg_FH_Data ca_data, + ADIO_Offset_CA *offset_list, + ADIO_Offset_CA *len_list, + int contig_access_count, + const void *buf, + H5S_flatbuf_t *memFlatBuf, + int *error_code, + ADIO_Offset_CA firstFileOffset, + ADIO_Offset_CA lastFileOffset, + int numNonZeroDataOffsets, + ADIO_Offset_CA *fd_start, + ADIO_Offset_CA* fd_end, + int hole_found, + FS_Block_Parms *stripe_parms); + +void H5FD_mpio_ccio_osagg_read(CustomAgg_FH_Data ca_data, + ADIO_Offset_CA *offset_list, + ADIO_Offset_CA *len_list, + int contig_access_count, + const void *buf, + H5S_flatbuf_t *flatBuf, + int *error_code, + ADIO_Offset_CA firstFileOffset, + ADIO_Offset_CA lastFileOffset, + int numNonZeroDataOffsets, + ADIO_Offset_CA *fd_start, + ADIO_Offset_CA* fd_end, + FS_Block_Parms *stripe_parms, + int do_file_read); + +void H5FD_mpio_ccio_file_read(CustomAgg_FH_Data ca_data, int *error_code, + ADIO_Offset_CA firstFileOffset, ADIO_Offset_CA lastFileOffset, + ADIO_Offset_CA *fd_start, ADIO_Offset_CA* fd_end); + +void *IO_Thread_Func(void *vptr_args); + +/***********************************/ +/* END CCIO Typedefs and Functions */ +/***********************************/ /* * The driver identification number, initialized at runtime if H5_HAVE_PARALLEL @@ -68,6 +265,7 @@ typedef struct H5FD_mpio_t { haddr_t eoa; /*end-of-address marker */ haddr_t last_eoa; /* Last known end-of-address marker */ haddr_t local_eof; /* Local end-of-file address for each process */ + CustomAgg_FH_Struct_Data custom_agg_data; } H5FD_mpio_t; /* Private Prototypes */ @@ -89,12 +287,20 @@ static herr_t H5FD_mpio_read(H5FD_t *_file, H5FD_mem_t type, hid_t dxpl_id, hadd size_t size, void *buf); static herr_t H5FD_mpio_write(H5FD_t *_file, H5FD_mem_t type, hid_t dxpl_id, haddr_t addr, size_t size, const void *buf); +static herr_t H5FD_mpio_custom_read(H5FD_t *_file, H5FD_mem_t type, hid_t dxpl_id, + hid_t file_space, hid_t mem_space, size_t elmt_size, haddr_t addr, void *buf); +static herr_t H5FD_mpio_custom_write(H5FD_t *_file, H5FD_mem_t type, hid_t dxpl_id, + hid_t file_space, hid_t mem_space, size_t elmt_size, haddr_t addr, const void *buf); static herr_t H5FD_mpio_flush(H5FD_t *_file, hid_t dxpl_id, hbool_t closing); static herr_t H5FD_mpio_truncate(H5FD_t *_file, hid_t dxpl_id, hbool_t closing); static int H5FD_mpio_mpi_rank(const H5FD_t *_file); static int H5FD_mpio_mpi_size(const H5FD_t *_file); static MPI_Comm H5FD_mpio_communicator(const H5FD_t *_file); static herr_t H5FD_mpio_get_info(H5FD_t *_file, void** mpi_info); +static herr_t H5FD_mpio_ccio_setup(const char *name, H5FD_mpio_t *file, MPI_File fh); +static herr_t H5FD_mpio_ccio_cleanup(const H5FD_mpio_t *file); +static herr_t H5FD_mpio_setup_flatbuf( H5S_sel_type space_sel_type, H5S_flatbuf_t *curflatbuf, + H5S_sel_iter_t *sel_iter, H5S_t *space_stype, size_t elmt_size, hbool_t is_regular); /* The MPIO file driver information */ static const H5FD_class_mpi_t H5FD_mpio_g = { @@ -126,6 +332,8 @@ static const H5FD_class_mpi_t H5FD_mpio_g = { H5FD_mpio_get_handle, /*get_handle */ H5FD_mpio_read, /*read */ H5FD_mpio_write, /*write */ + H5FD_mpio_custom_read, /*select_read */ + H5FD_mpio_custom_write, /*select_write */ H5FD_mpio_flush, /*flush */ H5FD_mpio_truncate, /*truncate */ NULL, /*lock */ @@ -157,7 +365,6 @@ static int H5FD_mpio_Debug[256] = 0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0, 0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0 }; #endif - /*-------------------------------------------------------------------------- NAME @@ -1013,6 +1220,10 @@ H5FD_mpio_open(const char *name, unsigned flags, hid_t fapl_id, file->mpi_rank = mpi_rank; file->mpi_size = mpi_size; + /* Setup structures for ccio optimizations */ + /* (Optimizations used for select_ calls) */ + H5FD_mpio_ccio_setup(name, file, fh); + /* Only processor p0 will get the filesize and broadcast it. */ if (mpi_rank == 0) { if (MPI_SUCCESS != (mpi_code=MPI_File_get_size(fh, &size))) @@ -1106,6 +1317,9 @@ H5FD_mpio_close(H5FD_t *_file) if (MPI_SUCCESS != (mpi_code=MPI_File_close(&(file->f)/*in,out*/))) HMPI_GOTO_ERROR(FAIL, "MPI_File_close failed", mpi_code) + /* Clean structures used for ccio optimizations */ + H5FD_mpio_ccio_cleanup( file ); + /* Clean up other stuff */ H5FD_mpi_comm_info_free(&file->comm, &file->info); H5MM_xfree(file); @@ -1863,7 +2077,584 @@ done: FUNC_LEAVE_NOAPI(ret_value) } /* end H5FD_mpio_write() */ - +/*------------------------------------------------------------------------- + * Function: H5FD_mpio_setup_flatbuf + * + * Purpose: Define the flatbuf structure needed by H5FD_mpio_custom_write. + * This is a helper function to avoid repeating code. The flatbuf can + * be a file or memory flatbuf -- and the structure depends on the type + * of selection. + * + * Return: Success: Non-negative + * Failure: Negative + * + * Programmer: Rick Zamora, 2018-07-03 + * (Based on code originally in H5FD_mpio_custom_write) + *------------------------------------------------------------------------- + */ +static herr_t +H5FD_mpio_setup_flatbuf( H5S_sel_type space_sel_type, H5S_flatbuf_t *curflatbuf, + H5S_sel_iter_t *sel_iter, H5S_t *space_stype, size_t elmt_size, hbool_t is_regular) +{ + herr_t ret_value = SUCCEED; + size_t sel_nseq, sel_nelem; + hsize_t flatBufSize; + unsigned null_flags = 0; + hsize_t num_points; + hssize_t snum_points; + int numSpaceDims = 0; + int numSelDims = 0; + H5S_hyper_dim_t *diminfo; + hsize_t numBlockEntries = 1; + hsize_t numElements = 1; + herr_t rc = 0; + + if (space_sel_type == H5S_SEL_NONE) { + curflatbuf->indices = NULL; + curflatbuf->blocklens = NULL; + curflatbuf->count = 0; + curflatbuf->size = 0; + curflatbuf->extent = 0; +#ifdef H5FDmpio_DEBUG + if (H5FD_mpio_Debug[(int)'t']) { + fprintf(stdout,"space_sel_type == H5S_SEL_NONE for flatbuf - setting everything to 0\n"); + fflush(stdout); + } +#endif + } + else if (space_sel_type == H5S_SEL_ALL) { + + /* For H5S_SEL_ALL there is just 1 big block */ + curflatbuf->indices = (hsize_t *) H5MM_malloc(1 * sizeof(hsize_t)); + curflatbuf->blocklens = (size_t *) H5MM_malloc(1 * sizeof(size_t)); + curflatbuf->count = 1; + curflatbuf->extent = curflatbuf->indices[0] + (hsize_t)curflatbuf->blocklens[0]; + + if(H5S__all_get_seq_list(space_stype,null_flags,sel_iter,1,sel_iter->elmt_left,&sel_nseq,&sel_nelem,curflatbuf->indices,curflatbuf->blocklens) < 0) + { + fprintf(stdout,"ERROR: H5S__all_get_seq_list failed"); + ret_value = FAIL; + } + + flatBufSize = 0; + for (int j=0;jcount;j++) { + flatBufSize += curflatbuf->blocklens[j]; + } + curflatbuf->size = flatBufSize; + +#ifdef H5FDmpio_DEBUG + if (H5FD_mpio_Debug[(int)'t']) { + fprintf(stdout,"space_sel_type == H5S_SEL_ALL for flatbuf - curflatbuf->size is %ld curflatbuf->indices[0] is %ldcurflatbuf->blocklens[0] is %ld curflatbuf->extent is %ld\n",curflatbuf->size,curflatbuf->indices[0],curflatbuf->blocklens[0],curflatbuf->extent); + fflush(stdout); + } +#endif + } + else if (space_sel_type == H5S_SEL_POINTS) { + + if((snum_points = (hssize_t)H5S_GET_SELECT_NPOINTS(space_stype)) < 0) + { + fprintf(stdout,"ERROR: can't get number of elements selected"); + ret_value = FAIL; + } + num_points = (hsize_t)snum_points; + + curflatbuf->indices = (hsize_t *) H5MM_malloc(num_points * sizeof(hsize_t)); + curflatbuf->blocklens = (size_t *) H5MM_malloc(num_points * sizeof(size_t)); + curflatbuf->count = 1; + + /* Get the extent */ + hsize_t dims[H5O_LAYOUT_NDIMS]; /* Total size of memory buf */ + if((numSpaceDims = H5S_get_simple_extent_dims (space_stype, dims, NULL)) < 0){ + fprintf(stdout,"ERROR: unable to retrieve data space dimensions"); + ret_value = FAIL; + } + curflatbuf->extent = 1; + for (int j=0;jextent *= dims[j]; + + if(H5S_point_get_seq_list(space_stype,null_flags,sel_iter,num_points,num_points,&sel_nseq,&sel_nelem,curflatbuf->indices,curflatbuf->blocklens) < 0) + { + fprintf(stdout,"ERROR: H5S__all_get_seq_list failed"); + ret_value = FAIL; + } + + flatBufSize = 0; + for (int j=0;jcount;j++) { + flatBufSize += curflatbuf->blocklens[j]; + } + curflatbuf->size = flatBufSize; + +#ifdef H5FDmpio_DEBUG + if (H5FD_mpio_Debug[(int)'t']) { + fprintf(stdout,"space_sel_type == H5S_SEL_POINTS called H5S_point_get_seq_list for file flatbuf - curflatbuf->count is %ld numSpaceDims is %d curflatbuf->extent is %ld curflatbuf->size is %ld returned sel_nseq %ld sel_nelem %ld offset/len pairs for curflatbuf->count entries are:\n",curflatbuf->count,numSpaceDims,curflatbuf->extent,curflatbuf->size,sel_nseq,sel_nelem); + for (int j=0;jcount;j++) + fprintf(stdout, " %d offset: %ld len: %ld\n",j,curflatbuf->indices[j],curflatbuf->blocklens[j]); + fflush(stdout); + } +#endif + } + else if (space_sel_type == H5S_SEL_HYPERSLABS) { + + if (!is_regular){ + fprintf(stdout, "ERROR: irregular space selection not supported"); + // rjz - commenting this check for now: + //ret_value = FAIL; + } + + diminfo = sel_iter->u.hyp.diminfo; + HDassert(diminfo); + + /* Here we need to use a function inside the space module since that is where the H5S_t structure is + * actually defined. + */ + rc = H5S_mpio_return_space_rank_and_extent(space_stype, &numSpaceDims, &(curflatbuf->extent)); + + curflatbuf->extent *= elmt_size; + + /* Check for flattened selection, if so use the selection iter_rank for the number of + * dimensions instead of the space rank. + */ + if(sel_iter->u.hyp.iter_rank != 0 && sel_iter->u.hyp.iter_rank < numSpaceDims) + numSelDims = sel_iter->u.hyp.iter_rank; + else + numSelDims = numSpaceDims; + +#ifdef H5FDmpio_DEBUG + if (H5FD_mpio_Debug[(int)'t']) { + fprintf(stdout,"space_sel_type == H5S_SEL_HYPERSLABS computing numBlockEntries and numElements\n"); + fflush(stdout); + } +#endif + + numBlockEntries = 1, numElements = 1; + for(int u = 0; u < numSelDims; u++) { +#ifdef H5FDmpio_DEBUG + if (H5FD_mpio_Debug[(int)'t']) { + fprintf(stdout,"iter %d diminfo[u].count is %ld and diminfo[u].block is %ld\n",u,diminfo[u].count,diminfo[u].block); + fflush(stdout); + } +#endif + if (u < (numSelDims-1)) { + numBlockEntries *= (diminfo[u].count * diminfo[u].block); + numElements *= (diminfo[u].count * diminfo[u].block); + } + else { + numBlockEntries *= diminfo[u].count; + numElements *= (diminfo[u].count * diminfo[u].block); + } + } + + curflatbuf->indices = (hsize_t *) H5MM_malloc(numElements * sizeof(hsize_t)); + curflatbuf->blocklens = (size_t *) H5MM_malloc(numElements * sizeof(size_t)); + curflatbuf->count = numBlockEntries; + +#ifdef H5FDmpio_DEBUG + if (H5FD_mpio_Debug[(int)'t']) { + fprintf(stdout,"calling H5S__hyper_get_seq_list for file flatbuf - numSelDims is %d numElements is %ld curflatbuf->count is %ld curflatbuf->extent is %ld\n",numSelDims,numElements, curflatbuf->count, curflatbuf->extent); + fflush(stdout); + } +#endif + + if(H5S__hyper_get_seq_list(space_stype,null_flags,sel_iter,numElements,numElements,&sel_nseq,&sel_nelem,curflatbuf->indices,curflatbuf->blocklens) < 0) + { + fprintf(stdout,"ERROR: H5S__hyper_get_seq_list failed"); + ret_value = FAIL; + } + + flatBufSize = 0; + for (int j=0;jcount;j++) { + flatBufSize += curflatbuf->blocklens[j]; + } + curflatbuf->size = flatBufSize; + +#ifdef H5FDmpio_DEBUG + if (H5FD_mpio_Debug[(int)'t']) { + fprintf(stdout,"called H5S__hyper_get_seq_list for file flatbuf - numSelDims is %d numElements is %ld curflatbuf->count is %ld curflatbuf->extent is %ld curflatbuf->size is %ld returned sel_nseq %ld sel_nelem %ld offset/len pairs for curflatbuf->count entries are:\n",numSelDims,numElements, curflatbuf->count,curflatbuf->extent,curflatbuf->size,sel_nseq,sel_nelem); + for (int j=0;jcount;j++) + fprintf(stdout, " %d offset: %ld len: %ld\n",j,curflatbuf->indices[j],curflatbuf->blocklens[j]); + fflush(stdout); + } +#endif + + } + else { + fprintf(stdout, "ERROR: In H5FD_mpio_setup_flatbuf, space selection type not recognized"); + ret_value = FAIL; + } + + return ret_value; +} /* H5FD_mpio_setup_flatbuf */ + +/*------------------------------------------------------------------------- + * Function: H5FD_mpio_custom_write + * + * Purpose: Writes data from a memory flatbuf into a file flatbuf. The memory + * and file flatbuf structures are defined using the H5S_<*>_get_seq_list + * functions, where <*> depends on the type of selection: all, points, + * hyperslab, or none. + * Note that this function is called from H5FD_select_write(), and is + * used to call optimized "write" routines defined in the "custom-collective + * IO virtual file layer" (CCIO) of the MPIO-VFD (see H5FDmpio_ccio.c). + * + * Return: + * + * Programmer: Quincey Koziol and Paul Coffman + * Unknown (Winter), 2018 + * + * Modifications: + * Rick Zamora, 2018-07-02 + * cleanup and refactoring. + * + *------------------------------------------------------------------------- + */ +static herr_t H5FD_mpio_custom_write(H5FD_t *_file, H5FD_mem_t type, hid_t dxpl_id, + hid_t file_space, hid_t mem_space, size_t elmt_size, haddr_t addr, const void *buf) +{ + H5FD_mpio_t *file = (H5FD_mpio_t*)_file; + MPI_Offset mpi_off; + MPI_Status mpi_stat; /* Status from I/O operation */ + H5S_t *file_space_stype; + int file_space_ref_count; + H5S_t *mem_space_stype; + int mem_space_ref_count; + int mpi_code; /* MPI return code */ +#if MPI_VERSION >= 3 + MPI_Count bytes_written; + MPI_Count type_size; /* MPI datatype used for I/O's size */ + MPI_Count io_size; /* Actual number of bytes requested */ +#else + int bytes_written; + int type_size; /* MPI datatype used for I/O's size */ + int io_size; /* Actual number of bytes requested */ +#endif + int size_i; + H5P_genplist_t *plist = NULL; /* Property list pointer */ + H5FD_mpio_xfer_t xfer_mode; /* I/O tranfer mode */ + herr_t ret_value = SUCCEED; + H5S_flatbuf_t file_flatbuf; + H5S_flatbuf_t mem_flatbuf; + hbool_t is_permuted = FALSE; + hbool_t is_regular = TRUE; + H5S_sel_iter_t sel_iter; + H5S_class_t file_space_extent_type; + H5S_class_t mem_space_extent_type; + H5S_sel_type file_space_sel_type; + H5S_sel_type mem_space_sel_type; + herr_t rc = 0; + hsize_t *permute_map = NULL; + + /* Note: permute_map array holds the mapping from the old (out-of-order) + * displacements to the in-order displacements of the H5S_flatbuf_t of the + * point selection of the file space. + */ + + FUNC_ENTER_NOAPI_NOINIT + + /* File and memory space setup */ + file_space_stype = (H5S_t *) H5I_remove(file_space); + file_space_ref_count = H5I_dec_ref(file_space); + mem_space_stype = (H5S_t *) H5I_remove(mem_space); + mem_space_ref_count = H5I_dec_ref(mem_space); + + /* some numeric conversions */ + if(H5FD_mpi_haddr_to_MPIOff(addr, &mpi_off) < 0) + HGOTO_ERROR(H5E_INTERNAL, H5E_BADRANGE, FAIL, "can't convert from haddr to MPI off") + + size_i = (int)elmt_size; + if((hsize_t)size_i != elmt_size) + HGOTO_ERROR(H5E_INTERNAL, H5E_BADRANGE, FAIL, "can't convert from elmt_size to size_i") + + HDassert(file); + HDassert(H5FD_MPIO==file->pub.driver_id); + + /* Make certain we have the correct type of property list */ + HDassert(H5I_GENPROP_LST==H5I_get_type(dxpl_id)); + HDassert(TRUE==H5P_isa_class(dxpl_id,H5P_DATASET_XFER)); + HDassert(buf); + + /* Portably initialize MPI status variable */ + HDmemset(&mpi_stat, 0, sizeof(MPI_Status)); + + /* + * Create flatbuf for FILE space selection + */ + + if(H5S_select_iter_init(&sel_iter, file_space_stype, elmt_size) < 0) + HGOTO_ERROR(H5E_DATASPACE, H5E_CANTINIT, FAIL, "unable to initialize selection iterator") + + rc = H5S_mpio_return_space_extent_and_select_type(file_space_stype, &is_permuted, &is_regular, &file_space_extent_type, &file_space_sel_type); + + if(is_permuted) + HGOTO_ERROR(H5E_DATASPACE, H5E_BADTYPE, FAIL, "permuted space selections not supported") + + /* Currently, file_space_extent_type must be H5S_NULL, H5S_SCALAR, or H5S_SIMPLE */ + if (!((file_space_extent_type == H5S_NULL) || (file_space_extent_type == H5S_SCALAR) || (file_space_extent_type == H5S_SIMPLE))) + HGOTO_ERROR(H5E_DATASPACE, H5E_BADTYPE, FAIL, "file space extent type invalid") + + if(H5S_SELECT_ITER_RELEASE(&sel_iter) < 0) + HDONE_ERROR(H5E_DATASPACE, H5E_CANTRELEASE, FAIL, "unable to release selection iterator") + + if ((file_space_sel_type == H5S_SEL_NONE) || (file_space_sel_type == H5S_SEL_ALL) || + (file_space_sel_type == H5S_SEL_POINTS) || (file_space_sel_type == H5S_SEL_HYPERSLABS)) { + if( H5FD_mpio_setup_flatbuf( file_space_sel_type, &file_flatbuf, &sel_iter, file_space_stype, elmt_size, is_regular ) < 0) + HGOTO_ERROR(H5E_DATASPACE, H5E_BADTYPE, FAIL, "Call to H5FD_mpio_setup_flatbuf failed for FILE") + } + else { + HGOTO_ERROR(H5E_DATASPACE, H5E_BADTYPE, FAIL, "Space selection type not recognized") + } + +#ifdef onesidedtrace +// +// typedef struct H5S_flatbuf_t { +// hsize_t count; /* number of contiguous blocks */ +// size_t *blocklens; /* array of contiguous block lengths (bytes)*/ +// hsize_t *indices; /*array of byte offsets of each block */ +// hsize_t extent; /* offset range for one instance of this flatbuf */ +// hsize_t size; /* number of bytes of block data */ +// } H5S_flatbuf_t; +// + printf("_______ - file_flatbuf.count = %d, file_flatbuf.extent = %d, file_flatbuf.size = %d\n",file_flatbuf.count,file_flatbuf.extent,file_flatbuf.size); + for (int i=0; icustom_agg_data), buf, mpi_off, &mem_flatbuf, &file_flatbuf, &error_code); + if (file_flatbuf.indices) H5MM_free(file_flatbuf.indices); + if (file_flatbuf.blocklens) H5MM_free(file_flatbuf.blocklens); + if (mem_flatbuf.indices) H5MM_free(mem_flatbuf.indices); + if (mem_flatbuf.blocklens) H5MM_free(mem_flatbuf.blocklens); + + } + else { + /* + * Not collective IO, just do MPI_File_write_at - don't support this for now + */ + HGOTO_ERROR(H5E_PLIST, H5E_CANTGET, FAIL, "H5FD_MPIO_COLLECTIVE xfer mode required for custom aggregation") + } + +done: + FUNC_LEAVE_NOAPI(ret_value) +} + +/*------------------------------------------------------------------------- + * Function: H5FD_mpio_custom_read + * + * Purpose: Reads data from a file flatbuf into a memory flatbuf. The memory + * and file flatbuf structures are defined using the H5S_<*>_get_seq_list + * functions, where <*> depends on the type of selection: all, points, + * hyperslab, or none. + * Note that this function is called from H5FD_select_read(), and is + * used to call optimized "read" routines defined in the "custom-collective + * IO virtual file layer" (CCIO) of the MPIO-VFD (see H5FDmpio_ccio.c). + * + * Return: + * + * Programmer: Rick Zamora, 2018-07-10 + * + * + * Modifications: + * Rick Zamora, 2018-11-06 + * cleanup and refactoring. + * + *------------------------------------------------------------------------- + */ +static herr_t H5FD_mpio_custom_read(H5FD_t *_file, H5FD_mem_t type, hid_t dxpl_id, + hid_t file_space, hid_t mem_space, size_t elmt_size, haddr_t addr, void *buf) +{ + + H5FD_mpio_t *file = (H5FD_mpio_t*)_file; + MPI_Offset mpi_off; + MPI_Status mpi_stat; /* Status from I/O operation */ + H5S_t *file_space_stype; + int file_space_ref_count; + H5S_t *mem_space_stype; + int mem_space_ref_count; + int size_i; + H5P_genplist_t *plist = NULL; /* Property list pointer */ + H5FD_mpio_xfer_t xfer_mode; /* I/O tranfer mode */ + herr_t ret_value = SUCCEED; + H5S_flatbuf_t file_flatbuf; + H5S_flatbuf_t mem_flatbuf; + hbool_t is_permuted = FALSE; + hbool_t is_regular = TRUE; + H5S_sel_iter_t sel_iter; + H5S_class_t file_space_extent_type; + H5S_class_t mem_space_extent_type; + H5S_sel_type file_space_sel_type; + H5S_sel_type mem_space_sel_type; + herr_t rc = 0; + hsize_t *permute_map = NULL; + + /* Note: permute_map array holds the mapping from the old (out-of-order) + * displacements to the in-order displacements of the H5S_flatbuf_t of the + * point selection of the file space. + */ + + FUNC_ENTER_NOAPI_NOINIT + + /* File and memory space setup */ + file_space_stype = (H5S_t *) H5I_remove(file_space); + file_space_ref_count = H5I_dec_ref(file_space); + mem_space_stype = (H5S_t *) H5I_remove(mem_space); + mem_space_ref_count = H5I_dec_ref(mem_space); + + /* some numeric conversions */ + if(H5FD_mpi_haddr_to_MPIOff(addr, &mpi_off) < 0) + HGOTO_ERROR(H5E_INTERNAL, H5E_BADRANGE, FAIL, "can't convert from haddr to MPI off") + + size_i = (int)elmt_size; + if((hsize_t)size_i != elmt_size) + HGOTO_ERROR(H5E_INTERNAL, H5E_BADRANGE, FAIL, "can't convert from elmt_size to size_i") + + HDassert(file); + HDassert(H5FD_MPIO==file->pub.driver_id); + + /* Make certain we have the correct type of property list */ + HDassert(H5I_GENPROP_LST==H5I_get_type(dxpl_id)); + HDassert(TRUE==H5P_isa_class(dxpl_id,H5P_DATASET_XFER)); + HDassert(buf); + + /* Portably initialize MPI status variable */ + HDmemset(&mpi_stat, 0, sizeof(MPI_Status)); + + /* + * Create flatbuf for FILE space selection + */ + + if(H5S_select_iter_init(&sel_iter, file_space_stype, elmt_size) < 0) + HGOTO_ERROR(H5E_DATASPACE, H5E_CANTINIT, FAIL, "unable to initialize selection iterator") + + rc = H5S_mpio_return_space_extent_and_select_type(file_space_stype, &is_permuted, &is_regular, &file_space_extent_type, &file_space_sel_type); + + if(is_permuted) + HGOTO_ERROR(H5E_DATASPACE, H5E_BADTYPE, FAIL, "permuted space selections not supported") + + /* Currently, file_space_extent_type must be H5S_NULL, H5S_SCALAR, or H5S_SIMPLE */ + if (!((file_space_extent_type == H5S_NULL) || (file_space_extent_type == H5S_SCALAR) || (file_space_extent_type == H5S_SIMPLE))) + HGOTO_ERROR(H5E_DATASPACE, H5E_BADTYPE, FAIL, "file space extent type invalid") + + if(H5S_SELECT_ITER_RELEASE(&sel_iter) < 0) + HDONE_ERROR(H5E_DATASPACE, H5E_CANTRELEASE, FAIL, "unable to release selection iterator") + + if ((file_space_sel_type == H5S_SEL_NONE) || (file_space_sel_type == H5S_SEL_ALL) || + (file_space_sel_type == H5S_SEL_POINTS) || (file_space_sel_type == H5S_SEL_HYPERSLABS)) { + if( H5FD_mpio_setup_flatbuf( file_space_sel_type, &file_flatbuf, &sel_iter, file_space_stype, elmt_size, is_regular ) < 0) + HGOTO_ERROR(H5E_DATASPACE, H5E_BADTYPE, FAIL, "Call to H5FD_mpio_setup_flatbuf failed for FILE") + } + else { + HGOTO_ERROR(H5E_DATASPACE, H5E_BADTYPE, FAIL, "Space selection type not recognized") + } + + /* + * Create flatbuf for MEMORY space selection + */ + + if(H5S_select_iter_init(&sel_iter, mem_space_stype, elmt_size) < 0) + HGOTO_ERROR(H5E_DATASPACE, H5E_CANTINIT, FAIL, "unable to initialize selection iterator") + + rc = H5S_mpio_return_space_extent_and_select_type(mem_space_stype, &is_permuted, &is_regular, &mem_space_extent_type, &mem_space_sel_type); + + if(is_permuted) + HGOTO_ERROR(H5E_DATASPACE, H5E_BADTYPE, FAIL, "permuted space selections not supported") + + /* Currently, mem_space_extent_type must be H5S_NULL, H5S_SCALAR, or H5S_SIMPLE */ + if (!((mem_space_extent_type == H5S_NULL) || (mem_space_extent_type == H5S_SCALAR) || (mem_space_extent_type == H5S_SIMPLE))) + HGOTO_ERROR(H5E_DATASPACE, H5E_BADTYPE, FAIL, "space extent type invalid") + + if ((mem_space_sel_type == H5S_SEL_NONE) || (mem_space_sel_type == H5S_SEL_ALL) || + (mem_space_sel_type == H5S_SEL_POINTS) || (mem_space_sel_type == H5S_SEL_HYPERSLABS)) { + if( H5FD_mpio_setup_flatbuf( mem_space_sel_type, &mem_flatbuf, &sel_iter, mem_space_stype, elmt_size, is_regular ) < 0) + HGOTO_ERROR(H5E_DATASPACE, H5E_BADTYPE, FAIL, "Call to H5FD_mpio_setup_flatbuf failed for MEM") + } + else { + HGOTO_ERROR(H5E_DATASPACE, H5E_BADTYPE, FAIL, "Space selection type not recognized") + } + + if(H5S_SELECT_ITER_RELEASE(&sel_iter) < 0) + HDONE_ERROR(H5E_DATASPACE, H5E_CANTRELEASE, FAIL, "unable to release selection iterator") + + /* Obtain the data transfer properties */ + if(NULL == (plist = (H5P_genplist_t *)H5I_object(dxpl_id))) + HGOTO_ERROR(H5E_ARGS, H5E_BADTYPE, FAIL, "not a file access property list") + + /* get the transfer mode from the dxpl */ + if(H5P_get(plist, H5D_XFER_IO_XFER_MODE_NAME, &xfer_mode)<0) + HGOTO_ERROR(H5E_PLIST, H5E_CANTGET, FAIL, "can't get MPI-I/O transfer mode") + + /* + * If using collective IO call the custom agggregation algorithm here. + */ + if(xfer_mode == H5FD_MPIO_COLLECTIVE) { + + int error_code; + H5FD_mpio_ccio_read_one_sided((CustomAgg_FH_Data)&(file->custom_agg_data), buf, mpi_off, &mem_flatbuf, &file_flatbuf, &error_code); + if (file_flatbuf.indices) H5MM_free(file_flatbuf.indices); + if (file_flatbuf.blocklens) H5MM_free(file_flatbuf.blocklens); + if (mem_flatbuf.indices) H5MM_free(mem_flatbuf.indices); + if (mem_flatbuf.blocklens) H5MM_free(mem_flatbuf.blocklens); + + } + else { + /* + * Not collective IO, just do MPI_File_write_at - don't support this for now + */ + HGOTO_ERROR(H5E_PLIST, H5E_CANTGET, FAIL, "H5FD_MPIO_COLLECTIVE xfer mode required for custom aggregation") + } + +done: + FUNC_LEAVE_NOAPI(ret_value) +} + /*------------------------------------------------------------------------- * Function: H5FD_mpio_flush * @@ -1908,20 +2699,19 @@ done: FUNC_LEAVE_NOAPI(ret_value) } /* end H5FD_mpio_flush() */ - /*------------------------------------------------------------------------- * Function: H5FD_mpio_truncate * * Purpose: Make certain the file's size matches it's allocated size * - * This is a little sticky in the mpio case, as it is not + * This is a little sticky in the mpio case, as it is not * easy for us to track the current EOF by extracting it from - * write calls. + * write calls. * * Instead, we first check to see if the eoa has changed since - * the last call to this function. If it has, we call - * MPI_File_get_size() to determine the current EOF, and - * only call MPI_File_set_size() if this value disagrees + * the last call to this function. If it has, we call + * MPI_File_get_size() to determine the current EOF, and + * only call MPI_File_set_size() if this value disagrees * with the current eoa. * * Return: Success: Non-negative @@ -1932,11 +2722,11 @@ done: * * Changes: Heavily reworked to avoid unnecessary MPI_File_set_size() * calls. The hope is that these calls are superfluous in the - * typical case, allowing us to avoid truncates most of the + * typical case, allowing us to avoid truncates most of the * time. * - * The basic idea is to query the file system to get the - * current eof, and only truncate if the file systems + * The basic idea is to query the file system to get the + * current eof, and only truncate if the file systems * conception of the eof disagrees with our eoa. * * JRM -- 10/27/17 @@ -1963,13 +2753,13 @@ H5FD_mpio_truncate(H5FD_t *_file, hid_t H5_ATTR_UNUSED dxpl_id, hbool_t H5_ATTR_ MPI_Offset size; MPI_Offset needed_eof; - /* In principle, it is possible for the size returned by the - * call to MPI_File_get_size() to depend on whether writes from + /* In principle, it is possible for the size returned by the + * call to MPI_File_get_size() to depend on whether writes from * all proceeses have completed at the time process 0 makes the - * call. + * call. * * In practice, most (all?) truncate calls will come after a barrier - * and with no interviening writes to the file (with the possible + * and with no interviening writes to the file (with the possible * exception of sueprblock / superblock extension message updates). * * Check the "MPI file closing" flag in the API context to determine @@ -2000,13 +2790,13 @@ H5FD_mpio_truncate(H5FD_t *_file, hid_t H5_ATTR_UNUSED dxpl_id, hbool_t H5_ATTR_ if(MPI_SUCCESS != (mpi_code = MPI_File_set_size(file->f, needed_eof))) HMPI_GOTO_ERROR(FAIL, "MPI_File_set_size failed", mpi_code) - /* In general, we must wait until all processes have finished - * the truncate before any process can continue, since it is - * possible that a process would write at the end of the + /* In general, we must wait until all processes have finished + * the truncate before any process can continue, since it is + * possible that a process would write at the end of the * file, and this write would be discarded by the truncate. * - * While this is an issue for a user initiated flush, it may - * not be an issue at file close. If so, we may be able to + * While this is an issue for a user initiated flush, it may + * not be an issue at file close. If so, we may be able to * optimize out the following barrier in that case. */ if(MPI_SUCCESS != (mpi_code = MPI_Barrier(file->comm))) @@ -2114,5 +2904,4278 @@ H5FD_mpio_communicator(const H5FD_t *_file) FUNC_LEAVE_NOAPI(file->comm) } /* end H5FD_mpio_communicator() */ -#endif /* H5_HAVE_PARALLEL */ - +/*------------------------------------------------------------------------- + * Function: HDF5_ccio_win_setup + * + * Purpose: Function to setup one-sided communication structures. + * + * Return: MPI_SUCCESS on success. + * + * Note: This function must be called in mpio_open + * + *------------------------------------------------------------------------- + */ +int HDF5_ccio_win_setup(CustomAgg_FH_Data ca_data, int procs) { + + int ret = MPI_SUCCESS; + ret = MPI_Win_create(ca_data->io_buf,ca_data->cb_buffer_size,1,MPI_INFO_NULL,ca_data->comm, &(ca_data->io_buf_window)); +#ifdef onesidedtrace + printf("CREATING ca_data->io_buf_window %016lx - ret = %d.\n",ca_data->io_buf_window,ret); +#endif + if (ret != MPI_SUCCESS) goto fn_exit; + ca_data->io_buf_put_amounts = 0; + ret =MPI_Win_create(&(ca_data->io_buf_put_amounts),sizeof(int),sizeof(int),MPI_INFO_NULL,ca_data->comm, &(ca_data->io_buf_put_amounts_window)); + + if (ca_data->async_io_outer == 1) { + ret = MPI_Win_create(ca_data->io_buf_d,ca_data->cb_buffer_size,1,MPI_INFO_NULL,ca_data->comm, &(ca_data->io_buf_window_d)); + if (ret != MPI_SUCCESS) goto fn_exit; + ca_data->io_buf_put_amounts_d = 0; + ret = MPI_Win_create(&(ca_data->io_buf_put_amounts_d),sizeof(int),sizeof(int),MPI_INFO_NULL,ca_data->comm, &(ca_data->io_buf_put_amounts_window_d)); + } + +fn_exit: + return ret; +} + +/*------------------------------------------------------------------------- + * Function: H5FD_mpio_ccio_setup + * + * Purpose: Checks if CCIO VFD options are desired, and popluates + * necessary data structures. + * + * Return: Success: + * Failure: NULL + * + * Programmer: Paul Coffman & Rick Zamora + * June 13, 2018 + * + *------------------------------------------------------------------------- + */ +static herr_t H5FD_mpio_ccio_setup(const char *name, H5FD_mpio_t *file, MPI_File fh) +{ + char *do_custom_agg_rd = HDgetenv("HDF5_CCIO_RD"); + char *do_custom_agg_wr = HDgetenv("HDF5_CCIO_WR"); + char *cb_buffer_size = HDgetenv("HDF5_CCIO_CB_SIZE"); + char *cb_nodes = HDgetenv("HDF5_CCIO_CB_NODES"); + char *fs_block_size = HDgetenv("HDF5_CCIO_FS_BLOCK_SIZE"); + char *fs_block_count = HDgetenv("HDF5_CCIO_FS_BLOCK_COUNT"); + char *custom_agg_debug_str = HDgetenv("HDF5_CCIO_DEBUG"); + char *ccio_wr_method = HDgetenv("HDF5_CCIO_WR_METHOD"); + char *ccio_rd_method = HDgetenv("HDF5_CCIO_RD_METHOD"); + char *do_async_io = HDgetenv("HDF5_CCIO_ASYNC"); + char *set_cb_nodes_stride = HDgetenv("HDF5_CCIO_CB_STRIDE"); + char *use_file_system = HDgetenv("HDF5_CCIO_FS"); + char *do_topo_select = HDgetenv("HDF5_CCIO_TOPO_CB_SELECT"); + char *set_ppn = HDgetenv("HDF5_CCIO_TOPO_PPN"); + char *set_pps = HDgetenv("HDF5_CCIO_TOPO_PPS"); + char *use_fd_agg = HDgetenv("HDF5_CCIO_FD_AGG"); + int custom_agg_debug = 0; + int mpi_rank = file->mpi_rank; /* MPI rank of this process */ + int mpi_size = file->mpi_size; /* Total number of MPI processes */ + int i, rc; + herr_t ret_value = SUCCEED; + + FUNC_ENTER_NOAPI_NOINIT + {} + + if (custom_agg_debug_str && (strcmp(custom_agg_debug_str,"yes") == 0)) + custom_agg_debug = 1; + + if (use_fd_agg && (strcmp(use_fd_agg,"yes") == 0)) + file->custom_agg_data.fslayout = GPFS; + else + file->custom_agg_data.fslayout = LUSTRE; + + /* Set some defaults for the one-sided agg algorithm */ + file->custom_agg_data.ccio_read = 0; + file->custom_agg_data.ccio_write = 0; + file->custom_agg_data.cb_nodes = 1; + file->custom_agg_data.ppn = 0; + file->custom_agg_data.pps = 0; + file->custom_agg_data.cb_buffer_size = 1048576; + file->custom_agg_data.fs_block_count = 1; + file->custom_agg_data.fs_block_size = 1048576; + file->custom_agg_data.onesided_always_rmw = 0; + file->custom_agg_data.onesided_no_rmw = 1; + file->custom_agg_data.onesided_inform_rmw = 0; + file->custom_agg_data.onesided_write_aggmethod = 1; + file->custom_agg_data.onesided_read_aggmethod = 1; + file->custom_agg_data.topo_cb_select = DEFAULT; + file->custom_agg_data.ranklist_populated = 0; + + if (do_custom_agg_wr && (strcmp(do_custom_agg_wr,"yes") == 0)) { + file->custom_agg_data.ccio_write = 1; + } + if (do_custom_agg_rd && (strcmp(do_custom_agg_rd,"yes") == 0)) { + file->custom_agg_data.ccio_read = 1; + } + + /* Check if we are using CCIO Options*/ + if ( (file->custom_agg_data.ccio_read) || (file->custom_agg_data.ccio_write) ) { + + /* By default, use env variables for agg settings */ + if ( cb_nodes ) { + file->custom_agg_data.cb_nodes = atoi( cb_nodes ); + } + if ( set_ppn ) { + file->custom_agg_data.ppn = atoi( set_ppn ); + } + if ( set_pps ) { + file->custom_agg_data.pps = atoi( set_pps ); + } + if ( cb_buffer_size ) { + file->custom_agg_data.cb_buffer_size = atoi( cb_buffer_size ); + } + if ( fs_block_count ) { + file->custom_agg_data.fs_block_count = atoi( fs_block_count ); + } + if ( fs_block_size ) { + file->custom_agg_data.fs_block_size = atoi( fs_block_size ); + } + file->custom_agg_data.comm = file->comm; + file->custom_agg_data.fh = fh; + + /* TODO: Can we handle multiple stripes per aggregator? + * For now, just pretend like the stripe size is the same as the buffer size... + */ + int stripes_per_cb_buf = file->custom_agg_data.cb_buffer_size / file->custom_agg_data.fs_block_size; + if ( stripes_per_cb_buf > 1 ) { + file->custom_agg_data.fs_block_size = file->custom_agg_data.cb_buffer_size; + file->custom_agg_data.fs_block_count /= stripes_per_cb_buf; + } + + int tot_cb_bufsize = (int)(file->custom_agg_data.cb_buffer_size); + file->custom_agg_data.io_buf_put_amounts = 0; + file->custom_agg_data.io_buf_window = MPI_WIN_NULL; + file->custom_agg_data.io_buf_put_amounts_window = MPI_WIN_NULL; + + /* Determine IF and HOW asynchronous I/O will be performed */ + file->custom_agg_data.async_io_inner = 0; + file->custom_agg_data.async_io_outer = 0; + file->custom_agg_data.check_req = 0; + file->custom_agg_data.pthread_io = 0; + if (do_async_io && (strcmp(do_async_io,"yes") == 0)) { + /* Allow 'outer' pipelining if this is LUSTRE-like mapping */ + if(file->custom_agg_data.fslayout == LUSTRE) { + file->custom_agg_data.async_io_outer = 1; + file->custom_agg_data.io_buf_d = (char *) H5MM_malloc(tot_cb_bufsize*sizeof(char)); + } + /* Allow 'inner' pipelining if this is GPFS-like mapping */ + else { + file->custom_agg_data.cb_buffer_size *= 2; + tot_cb_bufsize = (int)(file->custom_agg_data.cb_buffer_size); + file->custom_agg_data.async_io_inner = 1; + file->custom_agg_data.pthread_io = 1; /* pthreads needed for current 'inner' approach */ + } + } + file->custom_agg_data.io_buf = (char *) H5MM_malloc(tot_cb_bufsize*sizeof(char)); + file->custom_agg_data.io_buf_put_amounts_d = 0; + file->custom_agg_data.io_buf_window_d = MPI_WIN_NULL; + file->custom_agg_data.io_buf_put_amounts_window_d = MPI_WIN_NULL; + file->custom_agg_data.use_dup = 0; + + if ( ccio_wr_method ) { + file->custom_agg_data.onesided_write_aggmethod = atoi( ccio_wr_method ); + if (file->custom_agg_data.onesided_write_aggmethod < 1) + file->custom_agg_data.onesided_write_aggmethod = 1; + if (file->custom_agg_data.onesided_write_aggmethod < 2) + file->custom_agg_data.onesided_write_aggmethod = 2; + } + if ( ccio_rd_method ) { + file->custom_agg_data.onesided_read_aggmethod = atoi( ccio_rd_method ); + if (file->custom_agg_data.onesided_read_aggmethod < 1) + file->custom_agg_data.onesided_read_aggmethod = 1; + if (file->custom_agg_data.onesided_read_aggmethod < 2) + file->custom_agg_data.onesided_read_aggmethod = 2; + } + + if (custom_agg_debug && (mpi_rank == 0)) { + fprintf(stdout,"Custom aggregation info on mpio_open: MPI_MAX_INFO_VAL is %d H5FD_mpio_open fh is %016lx cb_buffer_size is %d cb_nodes is %d fs_block_count is %d fs_block_size is %d\n",MPI_MAX_INFO_VAL,fh,file->custom_agg_data.cb_buffer_size,file->custom_agg_data.cb_nodes,file->custom_agg_data.fs_block_count,file->custom_agg_data.fs_block_size); + fflush(stdout); + } + + /* Generate the initial ranklist using a constant stride between ranks */ + file->custom_agg_data.ranklist = (int *) H5MM_malloc(mpi_size * sizeof(int)); + for (i=0;icustom_agg_data.ranklist[i] = i; + int cb_nodes_stride = mpi_size / file->custom_agg_data.cb_nodes; + + /* If HDF5_CCIO_CB_STRIDE is set to a reasonable value, use it */ + if (set_cb_nodes_stride) { + int set_stride_val = atoi( set_cb_nodes_stride ); + if ((set_stride_val > 0) && (set_stride_val <= cb_nodes_stride)) { + cb_nodes_stride = set_stride_val; + } + } + for (i=0;i<(file->custom_agg_data.cb_nodes);i++) { + file->custom_agg_data.ranklist[i] = i*cb_nodes_stride; + } + + /* + * Here, we can check the HDF5_CCIO_TOPO_CB_SELECT env variable. + * Use string to set AGGSelect custom_agg_data value... + */ + if (do_topo_select) { + if (strcmp(do_topo_select,"data") == 0) { + file->custom_agg_data.topo_cb_select = DATA; + } else if (strcmp(do_topo_select,"spread") == 0) { + file->custom_agg_data.topo_cb_select = SPREAD; + } else if (strcmp(do_topo_select,"strided") == 0) { + /* Stride not really supported through topology API, + * Just use the strided rank list created above. + */ + file->custom_agg_data.topo_cb_select = DEFAULT; + } else if (strcmp(do_topo_select,"random") == 0) { + file->custom_agg_data.topo_cb_select = RANDOM; + } + } + + /* Show the aggregator ranks if we are in debug mode */ + if (custom_agg_debug && (mpi_rank == 0)) { + fprintf(stdout,"DEBUG: file->custom_agg_data.cb_nodes is now set to %d romio_aggregator_list is:", file->custom_agg_data.cb_nodes); + for (i=0;icustom_agg_data.cb_nodes;i++) + fprintf(stdout," %d",file->custom_agg_data.ranklist[i]); + fprintf(stdout,"\n"); + fflush(stdout); + } + + } + +done: + FUNC_LEAVE_NOAPI(ret_value) +} /* H5FD_mpio_ccio_setup */ + +/*------------------------------------------------------------------------- + * Function: H5FD_mpio_ccio_cleanup + * + * Purpose: Cleans data structures used for CCIO VFD options + * + * Return: Success: + * Failure: NULL + * + * Programmer: Rick Zamora + * October 25, 2018 + * + *------------------------------------------------------------------------- + */ +static herr_t H5FD_mpio_ccio_cleanup(const H5FD_mpio_t *file) +{ + herr_t ret_value = SUCCEED; + FUNC_ENTER_NOAPI_NOINIT + {} + + /* + * If doing custom aggregation, clean it up. + */ + char *do_custom_agg_wr = HDgetenv("HDF5_CCIO_WR"); + char *do_custom_agg_rd = HDgetenv("HDF5_CCIO_RD"); + if ( (do_custom_agg_wr && (strcmp(do_custom_agg_wr,"yes") == 0)) || + (do_custom_agg_rd && (strcmp(do_custom_agg_rd,"yes") == 0)) ) { + + CustomAgg_FH_Data ca_data = (CustomAgg_FH_Data)&(file->custom_agg_data); + if (ca_data->io_buf_window != MPI_WIN_NULL) + ret_value = MPI_Win_free(&ca_data->io_buf_window); + if (ca_data->io_buf_put_amounts_window != MPI_WIN_NULL) + ret_value = MPI_Win_free(&ca_data->io_buf_put_amounts_window); + if (ca_data->io_buf_window_d != MPI_WIN_NULL) + ret_value = MPI_Win_free(&ca_data->io_buf_window_d); + if (ca_data->io_buf_put_amounts_window_d != MPI_WIN_NULL) + ret_value = MPI_Win_free(&ca_data->io_buf_put_amounts_window_d); + + H5MM_free(file->custom_agg_data.io_buf); + H5MM_free(file->custom_agg_data.ranklist); + if(file->custom_agg_data.async_io_outer) + H5MM_free(file->custom_agg_data.io_buf_d); + + } + +done: + FUNC_LEAVE_NOAPI(ret_value) +} /* H5FD_mpio_ccio_cleanup */ + + +/*------------------------------------------------------------------------- + * Function: H5FD_mpio_calc_offset_list + * + * Purpose: Function to get the offset list of absolute file offsets + * and associated lengths. + * + * Return: Void. + * + *------------------------------------------------------------------------- + */ +void H5FD_mpio_calc_offset_list(ADIO_Offset_CA + memFlatBufSize, H5S_flatbuf_t *fileFlatBuf, MPI_Offset mpi_off, + ADIO_Offset_CA **offset_list_ptr, ADIO_Offset_CA + **len_list_ptr, ADIO_Offset_CA *start_offset_ptr, + ADIO_Offset_CA *end_offset_ptr, int + *contig_access_count_ptr) +{ + int i, j, k; + int st_index=0; + int contig_access_count; + ADIO_Offset_CA *len_list; + ADIO_Offset_CA *offset_list; + + /* For the data in the memFlatBuf for this process, calculate the list of offsets and + lengths in the file and determine the start and end offsets using the fileFlatBuf */ + + if ( !(fileFlatBuf->size) || (memFlatBufSize == 0) ) { + *contig_access_count_ptr = 0; + *offset_list_ptr = (ADIO_Offset_CA *) H5MM_malloc(sizeof(ADIO_Offset_CA)); + *len_list_ptr = (ADIO_Offset_CA *) H5MM_malloc(sizeof(ADIO_Offset_CA)); + + offset_list = *offset_list_ptr; + len_list = *len_list_ptr; + offset_list[0] = 0; + len_list[0] = 0; + *start_offset_ptr = 0; + *end_offset_ptr = -1; + return; + } + else { + /* first count how many entries we will need to malloc correct amount of memory*/ + ADIO_Offset_CA bytesRemaining = memFlatBufSize; + int fbindex = 0; + int contig_access_count = 0; + while (bytesRemaining > 0) { + contig_access_count++; + bytesRemaining -= fileFlatBuf->blocklens[fbindex++]; + } +#ifdef onesidedtrace + printf("memFlatBufSize is %ld contig_access_count is %d\n",memFlatBufSize,contig_access_count); + fflush(stdout); +#endif + *offset_list_ptr = (ADIO_Offset_CA *) H5MM_malloc(contig_access_count*sizeof(ADIO_Offset_CA)); + *len_list_ptr = (ADIO_Offset_CA *) H5MM_malloc(contig_access_count*sizeof(ADIO_Offset_CA)); + offset_list = *offset_list_ptr; + len_list = *len_list_ptr; + + /* now set the offset and len list */ + bytesRemaining = memFlatBufSize; + fbindex = 0; + int offlenindex = 0; + while (bytesRemaining > 0) { + if (fileFlatBuf->blocklens[fbindex] <= bytesRemaining) { + offset_list[offlenindex] = fileFlatBuf->indices[fbindex] + mpi_off; + len_list[offlenindex] = fileFlatBuf->blocklens[fbindex]; + } + else { + offset_list[offlenindex] = fileFlatBuf->indices[fbindex] + mpi_off; + len_list[offlenindex] = bytesRemaining; + } + bytesRemaining -= fileFlatBuf->blocklens[fbindex]; + fbindex++; + offlenindex++; + + } + *contig_access_count_ptr = contig_access_count; + *start_offset_ptr = offset_list[0]; + *end_offset_ptr = offset_list[offlenindex-1] + len_list[offlenindex-1] - (ADIO_Offset_CA)1; + } +} /* H5FD_mpio_calc_offset_list */ + + +/*------------------------------------------------------------------------- + * Function: H5FD_mpio_ccio_write_one_sided + * + * Purpose: Generic One-sided Collective Write Implementation + * + * Return: Void. + * + *------------------------------------------------------------------------- + */ +void H5FD_mpio_ccio_write_one_sided(CustomAgg_FH_Data ca_data, const void *buf, MPI_Offset mpi_off, + H5S_flatbuf_t *memFlatBuf, H5S_flatbuf_t *fileFlatBuf, int *error_code) +{ + /* + * This function writes a memFlatBuf into a fileFlatBuf + */ + + int i, nprocs, myrank; + int contig_access_count = 0; + ADIO_Offset_CA start_offset, end_offset, fd_size, min_st_offset, off; + ADIO_Offset_CA *offset_list = NULL, *st_offsets = NULL, *fd_start = NULL, *fd_end = NULL, *end_offsets = NULL; + ADIO_Offset_CA *len_list = NULL; + int *fs_block_info = NULL; + ADIO_Offset_CA **buf_idx = NULL; + int old_error, tmp_error; + ADIO_Offset_CA *fs_offsets0, *fs_offsets, *count_sizes; + + MPI_Comm_size(ca_data->comm, &nprocs); + MPI_Comm_rank(ca_data->comm, &myrank); + +#ifdef topo_timing + double endTimeTopo = 0.0; + double startTimeTopo = 0.0; + double endTime = 0.0; + double startTime = 0.0; + startTime = MPI_Wtime(); +#endif + +#ifdef onesidedtrace + printf("Rank %d - H5FD_mpio_ccio_write_one_sided - ca_data->cb_nodes is %d\n",myrank,ca_data->cb_nodes); + fflush(stdout); + // dump the flatbufs + printf("Rank %d - memFlatBuf->size is %ld fileFlatBuf->size is %ld mpi_off is %ld\n",myrank,memFlatBuf->size,fileFlatBuf->size,mpi_off); + int flatbufCount = memFlatBuf->count; + for (i=0;iindices[%d] is %ld memFlatBuf->blocklens[%d] is %ld\n",myrank,i,memFlatBuf->indices[i],i,memFlatBuf->blocklens[i]); + } + flatbufCount = fileFlatBuf->count; + for (i=0;iindices[%d] is %ld fileFlatBuf->blocklens[%d] is %ld\n",myrank,i,fileFlatBuf->indices[i],i,fileFlatBuf->blocklens[i]); + } + fflush(stdout); +#endif + + /* For this process's request, calculate the list of offsets and + * lengths in the file and determine the start and end offsets. + * Note: end_offset points to the last byte-offset that will be accessed. + * e.g., if start_offset=0 and 100 bytes to be read, end_offset=99 + */ + + H5FD_mpio_calc_offset_list((ADIO_Offset_CA)memFlatBuf->size, fileFlatBuf, mpi_off, + &offset_list, &len_list, &start_offset, &end_offset, &contig_access_count); + +#ifdef onesidedtrace + printf("Rank %d - contig_access_count = %d\n",myrank,contig_access_count); +#endif + + /* each process communicates its start and end offsets to other + * processes. The result is an array each of start and end offsets + * stored in order of process rank. + */ + st_offsets = (ADIO_Offset_CA *) H5MM_malloc(nprocs * sizeof(ADIO_Offset_CA)); + end_offsets = (ADIO_Offset_CA *) H5MM_malloc(nprocs * sizeof(ADIO_Offset_CA)); + + /* One-sided aggregation needs the amount of data per rank as well because + * the difference in starting and ending offsets for 1 byte is 0 the same + * as 0 bytes so it cannot be distiguished. + */ + count_sizes = (ADIO_Offset_CA *) H5MM_malloc(nprocs*sizeof(ADIO_Offset_CA)); + fs_offsets0 = (ADIO_Offset_CA *) H5MM_malloc(3*nprocs*sizeof(ADIO_Offset_CA)); + fs_offsets = (ADIO_Offset_CA *) H5MM_malloc(3*nprocs*sizeof(ADIO_Offset_CA)); + for (i=0; isize; + MPI_Allreduce( fs_offsets0, fs_offsets, nprocs*3, MPI_LONG, MPI_MAX, ca_data->comm ); + for (i=0; i 0) { + st_offsets[currentValidDataIndex] = st_offsets[i]; + end_offsets[currentValidDataIndex] = end_offsets[i]; + + lastFileOffset = MAX(lastFileOffset,end_offsets[currentValidDataIndex]); + if (firstFileOffset == -1) + firstFileOffset = st_offsets[currentValidDataIndex]; + else + firstFileOffset = MIN(firstFileOffset,st_offsets[currentValidDataIndex]); + + currentValidDataIndex++; + } + } +#ifdef onesidedtrace + printf("Rank %d - H5FD_mpio_calc_offset_list results:\n",myrank); + for (i=0;itopo_cb_select != DEFAULT) && (ca_data->ranklist_populated==0)) { +#ifdef topo_timing + startTimeTopo = MPI_Wtime(); +#endif + + topology_aware_ranklist ( fileFlatBuf->blocklens, fileFlatBuf->indices, fileFlatBuf->count, &(ca_data->ranklist[0]), ca_data->cb_buffer_size, ca_data->cb_nodes, ca_data->ppn, ca_data->pps, 0, ca_data->comm, ca_data->topo_cb_select, (int)(ca_data->fslayout == GPFS) ); + + /* Only populating ranklist when necessary */ + ca_data->ranklist_populated = 1; + +#ifdef onesidedtrace + if (myrank == 0) { + fprintf(stdout,"Topology-aware CB Selection (type %d): ca_data->cb_nodes is %d, and ranklist is:", ca_data->topo_cb_select, ca_data->cb_nodes); + for (i=0;icb_nodes;i++) + fprintf(stdout," %d",ca_data->ranklist[i]); + fprintf(stdout,"\n"); + } + MPI_Barrier(ca_data->comm); +#endif + +#ifdef topo_timing + endTimeTopo = MPI_Wtime(); +#endif + } + + /* Use GPFS-like mapping of aggregators to file data */ + if (ca_data->fslayout == GPFS) { + + calc_file_domains(st_offsets, end_offsets, + currentValidDataIndex, ca_data->cb_nodes, &min_st_offset, &fd_start, + &fd_end, &fd_size, ca_data->fs_block_size); + + /* + * Pass this datastructure to indicate we are a non-striping filesystem + * (by setting stripe size to 0). + * That is, we are NOT using the LUSTRE approach here... + */ + + FS_Block_Parms noStripeParms; + noStripeParms.stripeSize = 0; + noStripeParms.segmentLen = 0; + noStripeParms.stripesPerAgg = 0; + noStripeParms.segmentIter = 0; + noStripeParms.flushCB = 1; + noStripeParms.stripedLastFileOffset = 0; + noStripeParms.firstStripedIOCall = 0; + noStripeParms.lastStripedIOCall = 0; + noStripeParms.iWasUsedStripingAgg = 0; + noStripeParms.numStripesUsed = 0; + noStripeParms.amountOfStripedDataExpected = 0; + noStripeParms.bufTypeExtent = 0; + noStripeParms.lastDataTypeExtent = 0; + noStripeParms.lastFlatBufIndice = 0; + noStripeParms.lastIndiceOffset = 0; + int holeFound = 0; + + H5FD_mpio_ccio_osagg_write(ca_data, offset_list, len_list, contig_access_count, + buf, memFlatBuf, error_code, firstFileOffset, lastFileOffset, + currentValidDataIndex, fd_start, fd_end, &holeFound, &noStripeParms); + + int anyHolesFound = 0; + if (!(ca_data->onesided_no_rmw)) + MPI_Allreduce(&holeFound, &anyHolesFound, 1, MPI_INT, MPI_MAX, ca_data->comm); + if (anyHolesFound == 0) { + H5MM_free(offset_list); + H5MM_free(len_list); + H5MM_free(st_offsets); + H5MM_free(end_offsets); + H5MM_free(fd_start); + H5MM_free(fd_end); + H5MM_free(count_sizes); + } + else { + /* Holes are found in the data and the user has not set + * romio_onesided_no_rmw --- set romio_onesided_always_rmw to 1 + * and re-call ADIOI_OneSidedWriteAggregation and if the user has + * romio_onesided_inform_rmw set then inform him of this condition + * and behavior. + */ + if (ca_data->onesided_inform_rmw && (myrank ==0)) { + fprintf(stderr,"Information: Holes found during one-sided " + "write aggregation algorithm --- re-running one-sided " + "write aggregation with ROMIO_ONESIDED_ALWAYS_RMW set to 1.\n"); + ca_data->onesided_always_rmw = 1; + int prev_onesided_no_rmw = ca_data->onesided_no_rmw; + ca_data->onesided_no_rmw = 1; + H5FD_mpio_ccio_osagg_write(ca_data, offset_list, len_list, contig_access_count, + buf, memFlatBuf, error_code, firstFileOffset, lastFileOffset, + currentValidDataIndex, fd_start, fd_end, &holeFound, &noStripeParms); + ca_data->onesided_no_rmw = prev_onesided_no_rmw; + H5MM_free(offset_list); + H5MM_free(len_list); + H5MM_free(st_offsets); + H5MM_free(end_offsets); + H5MM_free(fd_start); + H5MM_free(fd_end); + H5MM_free(count_sizes); + } + } + + } + /* Use LUSTRE-like mapping of aggregators to file data */ + else { + + /* Rewriting the ca_data as 'fs_block_info' (probably NOT necessary) */ + fs_block_info = (int *) H5MM_malloc(3 * sizeof(int)); + fs_block_info[0] = ca_data->fs_block_size; + fs_block_info[1] = ca_data->fs_block_count; + fs_block_info[2] = ca_data->cb_nodes; +#ifdef onesidedtrace + printf("Rank %d - ca_data->cb_buffer_size is %lu fs_block_info[0] is %d fs_block_info[1] is %d fs_block_info[2] is %d\n",myrank,ca_data->cb_buffer_size,fs_block_info[0],fs_block_info[1],fs_block_info[2]); + fflush(stdout); +#endif + + /* Async I/O - Make sure we are starting with the main buffer */ + ca_data->use_dup = 0; + + /* Iterate over 1+ aggregation rounds and write to FS when buffers are full */ + H5FD_mpio_ccio_iterate_write(ca_data, buf, fs_block_info, offset_list, len_list, mpi_off, contig_access_count, currentValidDataIndex, start_offset, end_offset, firstFileOffset, lastFileOffset, memFlatBuf, fileFlatBuf, myrank, error_code); + + /* Async I/O - Wait for any outstanding requests (we are done with this I/O call) */ + ca_data->use_dup = 0; + if (ca_data->check_req == 1) { + MPIO_Wait(&ca_data->io_Request, error_code); + ca_data->check_req = 0; + } + + H5MM_free(offset_list); + H5MM_free(len_list); + H5MM_free(st_offsets); + H5MM_free(end_offsets); + H5MM_free(count_sizes); + H5MM_free(fs_block_info); + + } + +#ifdef topo_timing + endTime = MPI_Wtime(); + double max_frac; + double l_frac = (endTimeTopo - startTimeTopo) / (endTime - startTime); + MPI_Allreduce ( &l_frac, &max_frac, 1, MPI_DOUBLE, MPI_MAX, ca_data->comm ); + if ((myrank == 0)&& (ca_data->topo_cb_select != DEFAULT)) { + printf("WRITE: Aggregator Selection Fraction = %f\n", max_frac); + fflush(stdout); + } + MPI_Barrier(ca_data->comm); +#endif + +} /* H5FD_mpio_ccio_write_one_sided */ + +/*------------------------------------------------------------------------- + * Function: H5FD_mpio_ccio_read_one_sided + * + * Purpose: Generic One-sided Collective Read Implementation + * + * Return: Void. + * + *------------------------------------------------------------------------- + */ + void H5FD_mpio_ccio_read_one_sided(CustomAgg_FH_Data ca_data, void *buf, MPI_Offset mpi_off, + H5S_flatbuf_t *memFlatBuf, H5S_flatbuf_t *fileFlatBuf, + int *error_code) +{ + /* + * This function reads a fileFlatBuf into a memFlatBuf + */ + + int i, ii, nprocs, nprocs_for_coll, myrank; + int contig_access_count=0; + ADIO_Offset_CA start_offset, end_offset, fd_size, min_st_offset, off; + ADIO_Offset_CA *offset_list = NULL, *st_offsets = NULL, *fd_start = NULL, + *fd_end = NULL, *end_offsets = NULL; + ADIO_Offset_CA *len_list = NULL; + ADIO_Offset_CA *fs_offsets0 = NULL, *fs_offsets = NULL; + ADIO_Offset_CA *count_sizes; + int *fs_block_info = NULL; + + MPI_Comm_size(ca_data->comm, &nprocs); + MPI_Comm_rank(ca_data->comm, &myrank); + +#ifdef topo_timing + double endTimeTopo = 0.0; + double startTimeTopo = 0.0; + double endTime = 0.0; + double startTime = 0.0; + startTime = MPI_Wtime(); +#endif + +#ifdef onesidedtrace + printf("Rank %d - H5FD_mpio_ccio_read_one_sided - ca_data->cb_nodes is %d\n",myrank,ca_data->cb_nodes); + fflush(stdout); + + /* dump the flatbufs */ + printf("Rank %d - memFlatBuf->size is %ld fileFlatBuf->size is %ld mpi_off is %ld\n",myrank,memFlatBuf->size,fileFlatBuf->size,mpi_off); + int flatbufCount = memFlatBuf->count; + for (i=0;iindices[%d] is %ld memFlatBuf->blocklens[%d] is %ld\n",myrank,i,memFlatBuf->indices[i],i,memFlatBuf->blocklens[i]); + } + flatbufCount = fileFlatBuf->count; + for (i=0;iindices[%d] is %ld fileFlatBuf->blocklens[%d] is %ld\n",myrank,i,fileFlatBuf->indices[i],i,fileFlatBuf->blocklens[i]); + } + fflush(stdout); +#endif + + /* For this process's request, calculate the list of offsets and + * lengths in the file and determine the start and end offsets. + * Note: end_offset points to the last byte-offset that will be accessed. + * e.g., if start_offset=0 and 100 bytes to be read, end_offset=99 + */ + + H5FD_mpio_calc_offset_list((ADIO_Offset_CA)memFlatBuf->size, fileFlatBuf, mpi_off, + &offset_list, &len_list, &start_offset, &end_offset, &contig_access_count); + +#ifdef onesidedtrace + printf("Rank %d - contig_access_count = %d\n",myrank,contig_access_count); +#endif + + /* each process communicates its start and end offsets to other + processes. The result is an array each of start and end offsets stored + in order of process rank. */ + st_offsets = (ADIO_Offset_CA *) H5MM_malloc(nprocs*sizeof(ADIO_Offset_CA)); + end_offsets = (ADIO_Offset_CA *) H5MM_malloc(nprocs*sizeof(ADIO_Offset_CA)); + + /* One-sided aggregation needs the amount of data per rank as well because + * the difference in starting and ending offsets for 1 byte is 0 the same + * as 0 bytes so it cannot be distiguished. + */ + count_sizes = (ADIO_Offset_CA *) H5MM_malloc(nprocs*sizeof(ADIO_Offset_CA)); + fs_offsets0 = (ADIO_Offset_CA *) H5MM_malloc(3*nprocs*sizeof(ADIO_Offset_CA)); + fs_offsets = (ADIO_Offset_CA *) H5MM_malloc(3*nprocs*sizeof(ADIO_Offset_CA)); + for (ii=0; iisize; + MPI_Allreduce( fs_offsets0, fs_offsets, nprocs*3, MPI_LONG, MPI_MAX, ca_data->comm ); + for (ii=0; ii 0) { + st_offsets[currentNonZeroDataIndex] = st_offsets[i]; + end_offsets[currentNonZeroDataIndex] = end_offsets[i]; + lastFileOffset = MAX(lastFileOffset,end_offsets[currentNonZeroDataIndex]); + if (firstFileOffset == -1) + firstFileOffset = st_offsets[currentNonZeroDataIndex]; + else + firstFileOffset = MIN(firstFileOffset,st_offsets[currentNonZeroDataIndex]); + currentNonZeroDataIndex++; + } + } + + /* Select Topology-aware list of cb_nodes if desired */ + if ((ca_data->topo_cb_select != DEFAULT) && (ca_data->ranklist_populated==0)) { +#ifdef topo_timing + startTimeTopo = MPI_Wtime(); +#endif + + topology_aware_ranklist ( fileFlatBuf->blocklens, fileFlatBuf->indices, fileFlatBuf->count, &(ca_data->ranklist[0]), ca_data->cb_buffer_size, ca_data->cb_nodes, ca_data->ppn, ca_data->pps, 0, ca_data->comm, ca_data->topo_cb_select, (int)(ca_data->fslayout == GPFS) ); + + /* Only populating ranklist when necessary */ + ca_data->ranklist_populated = 1; + +#ifdef onesidedtrace + if (myrank == 0) { + fprintf(stdout,"Topology-aware CB Selection: ca_data->cb_nodes is %d, and ranklist is:", ca_data->cb_nodes); + for (i=0;icb_nodes;i++) + fprintf(stdout," %d",ca_data->ranklist[i]); + fprintf(stdout,"\n"); + } + MPI_Barrier(ca_data->comm); +#endif + +#ifdef topo_timing + endTimeTopo = MPI_Wtime(); +#endif + } + + /* Use LUSTRE-style data mapping to aggs */ + if (ca_data->fslayout == LUSTRE) { + + /* Rewriting the ca_data as 'fs_block_info' (probably NOT necessary) */ + fs_block_info = (int *) H5MM_malloc(3 * sizeof(int)); + fs_block_info[0] = ca_data->fs_block_size; + fs_block_info[1] = ca_data->fs_block_count; + fs_block_info[2] = ca_data->cb_nodes; +#ifdef onesidedtrace + printf("Rank %d - ca_data->cb_buffer_size is %lu fs_block_info[0] is %d fs_block_info[1] is %d fs_block_info[2] is %d\n",myrank,ca_data->cb_buffer_size,fs_block_info[0],fs_block_info[1],fs_block_info[2]); + fflush(stdout); +#endif + + /* Async I/O - Make sure we are starting with the main buffer */ + if (ca_data->check_req == 1) { + MPIO_Wait(&ca_data->io_Request, error_code); + ca_data->check_req = 0; + } + if (ca_data->check_req_d == 1) { + MPIO_Wait(&ca_data->io_Request_d, error_code); + ca_data->check_req_d = 0; + } + ca_data->use_dup = 0; + + /* Iterate over 1+ aggregation rounds and read to mem when buffers are full */ + H5FD_mpio_ccio_iterate_read(ca_data, buf, fs_block_info, offset_list, len_list, mpi_off, contig_access_count, currentNonZeroDataIndex, start_offset, end_offset, firstFileOffset, lastFileOffset, memFlatBuf, fileFlatBuf, myrank, error_code); + + /* Async I/O - Wait for any outstanding requests (we are done with this I/O call) */ + if (ca_data->check_req == 1) { + MPIO_Wait(&ca_data->io_Request, error_code); + ca_data->check_req = 0; + } + if (ca_data->check_req_d == 1) { + MPIO_Wait(&ca_data->io_Request_d, error_code); + ca_data->check_req_d = 0; + } + ca_data->use_dup = 0; + + H5MM_free(offset_list); + H5MM_free(len_list); + H5MM_free(st_offsets); + H5MM_free(end_offsets); + H5MM_free(count_sizes); + H5MM_free(fs_block_info); + + } + /* Use GPFS-style data mapping to aggs */ + else { + + calc_file_domains(st_offsets, end_offsets, currentNonZeroDataIndex, ca_data->cb_nodes, &min_st_offset, &fd_start, &fd_end, &fd_size, ca_data->fs_block_size); + + /* Indicate that this is NOT a striped file system.. */ + FS_Block_Parms noStripeParms; + noStripeParms.stripeSize = 0; + noStripeParms.segmentLen = 0; + noStripeParms.stripesPerAgg = 0; + noStripeParms.segmentIter = 0; + noStripeParms.flushCB = 1; + noStripeParms.stripedLastFileOffset = 0; + noStripeParms.firstStripedIOCall = 0; + noStripeParms.lastStripedIOCall = 0; + noStripeParms.iWasUsedStripingAgg = 0; + noStripeParms.numStripesUsed = 0; + noStripeParms.amountOfStripedDataExpected = 0; + noStripeParms.bufTypeExtent = 0; + noStripeParms.lastDataTypeExtent = 0; + noStripeParms.lastFlatBufIndice = 0; + noStripeParms.lastIndiceOffset = 0; + + H5FD_mpio_ccio_osagg_read(ca_data, offset_list, len_list, contig_access_count, buf, memFlatBuf, error_code, firstFileOffset, lastFileOffset, currentNonZeroDataIndex, fd_start, fd_end, &noStripeParms, 1); + // last '1' means you SHOULD be reading in H5FD_mpio_ccio_osagg_read. + + H5MM_free(offset_list); + H5MM_free(len_list); + H5MM_free(st_offsets); + H5MM_free(end_offsets); + H5MM_free(fd_start); + H5MM_free(fd_end); + H5MM_free(count_sizes); + + } + +#ifdef topo_timing + endTime = MPI_Wtime(); + double max_frac; + double l_frac = (endTimeTopo - startTimeTopo)/(endTime - startTime); + MPI_Allreduce ( &l_frac, &max_frac, 1, MPI_DOUBLE, MPI_MAX, ca_data->comm ); + if ((myrank == 0)&& (ca_data->topo_cb_select != DEFAULT)) { + printf("READ: Aggregator Selection Fraction = %f\n", max_frac); + fflush(stdout); + } + MPI_Barrier(ca_data->comm); +#endif + +} /* H5FD_mpio_ccio_read_one_sided */ + +/*------------------------------------------------------------------------- + * Function: H5FD_mpio_ccio_iterate_write + * + * Purpose: This function calls H5FD_mpio_ccio_osagg_write + * iteratively to essentially pack stripes of data into the + * collective buffer and then flushes the buffer to the file when + * fully packed (repeating this process until all the data is + * completely written to the file). + * + * Return: Void. + * + *------------------------------------------------------------------------- + */ +void H5FD_mpio_ccio_iterate_write(CustomAgg_FH_Data ca_data, const void *buf, + int *fs_block_info, ADIO_Offset_CA *offset_list, ADIO_Offset_CA *len_list, + MPI_Offset mpi_off, int contig_access_count, int currentValidDataIndex, + ADIO_Offset_CA start_offset, ADIO_Offset_CA end_offset, + ADIO_Offset_CA firstFileOffset, ADIO_Offset_CA lastFileOffset, + H5S_flatbuf_t *memFlatBuf, H5S_flatbuf_t *fileFlatBuf, int myrank, int *error_code) +{ + + int i; + int stripesPerAgg = ca_data->cb_buffer_size / fs_block_info[0]; + int numStripedAggs = ca_data->cb_nodes; + if (stripesPerAgg == 0) { + /* The striping unit is larger than the collective buffer size + * therefore we must abort since the buffer has already been + * allocated during the open. + */ + fprintf(stderr,"Error: The collective buffer size %d is less " + "than the fs_block_size %d - This collective I/O implementation " + "cannot continue.\n",ca_data->cb_buffer_size,fs_block_info[0]); + MPI_Abort(MPI_COMM_WORLD, 1); + } + MPI_Comm_rank(ca_data->comm, &myrank); + + /* Declare FS_Block_Parms here - these parameters will be locally managed + * for this invokation of `iterate_one_sided`. This will allow for concurrent + * one-sided collective writes via multi-threading as well as multiple communicators. + */ + FS_Block_Parms stripeParms; + stripeParms.stripeSize = fs_block_info[0]; + stripeParms.stripedLastFileOffset = lastFileOffset; + stripeParms.iWasUsedStripingAgg = 0; + stripeParms.numStripesUsed = 0; + stripeParms.amountOfStripedDataExpected = 0; + stripeParms.bufTypeExtent = 0; + stripeParms.lastDataTypeExtent = 0; + stripeParms.lastFlatBufIndice = 0; + stripeParms.lastIndiceOffset = 0; + + /* The general algorithm here is to divide the file up into segements, a segment + * being defined as a contiguous region of the file which has up to one occurrence + * of each stripe - the data for each stripe being written out by a particular + * aggregator. The segmentLen is the maximum size in bytes of each segment + * (stripeSize*number of aggs). Iteratively call H5FD_mpio_ccio_osagg_write + * for each segment to aggregate the data to the collective buffers, but only do + * the actual write (via flushCB stripe parm) once stripesPerAgg stripes + * have been packed or the aggregation for all the data is complete, minimizing + * synchronization. + */ + stripeParms.segmentLen = ((ADIO_Offset_CA)numStripedAggs)*((ADIO_Offset_CA)(fs_block_info[0])); + + /* These arrays define the file offsets for the stripes for a given segment - similar + * to the concept of file domains in GPFS, essentially file domeains for the segment. + */ + ADIO_Offset_CA *segment_stripe_start = (ADIO_Offset_CA *) H5MM_malloc(numStripedAggs*sizeof(ADIO_Offset_CA)); + ADIO_Offset_CA *segment_stripe_end = (ADIO_Offset_CA *) H5MM_malloc(numStripedAggs*sizeof(ADIO_Offset_CA)); + + /* Find the actual range of stripes in the file that have data in the offset + * ranges being written -- skip holes at the front and back of the file. + */ + int currentOffsetListIndex = 0; + int fileSegmentIter = 0; + int startingStripeWithData = 0; + int foundStartingStripeWithData = 0; + while (!foundStartingStripeWithData) { + if ( ((startingStripeWithData+1) * (ADIO_Offset_CA)(fs_block_info[0])) > firstFileOffset) + foundStartingStripeWithData = 1; + else + startingStripeWithData++; + } + + ADIO_Offset_CA currentSegementOffset = (ADIO_Offset_CA)startingStripeWithData * (ADIO_Offset_CA)(fs_block_info[0]); + + int numSegments = (int) ((lastFileOffset+(ADIO_Offset_CA)1 - currentSegementOffset)/stripeParms.segmentLen); + if ((lastFileOffset+(ADIO_Offset_CA)1 - currentSegementOffset)%stripeParms.segmentLen > 0) + numSegments++; + +#ifdef onesidedtrace + printf("Rank %d - H5FD_mpio_ccio_iterate_write ca_data->cb_nodes is %d numStripedAggs is %d numSegments is %d start_offset is %ld end_offset is %ld firstFileOffset is %ld lastFileOffset is %ld\n",myrank,ca_data->cb_nodes,numStripedAggs,numSegments,start_offset,end_offset,firstFileOffset,lastFileOffset); + fflush(stdout); +#endif + + /* To support read-modify-write use a while-loop to redo the aggregation if necessary + * to fill in the holes. + */ + int doAggregation = 1; + int holeFound = 0; + + /* Remember onesided_no_rmw setting if we have to re-do + * the aggregation if holes are found. + */ + int prev_onesided_no_rmw = ca_data->onesided_no_rmw; + + while (doAggregation) { + + int totalDataWrittenLastRound = 0; + + /* This variable tracks how many segment stripes we have packed into the agg + * buffers so we know when to flush to the file system. + */ + stripeParms.segmentIter = 0; + + /* stripeParms.stripesPerAgg is the number of stripes to aggregate before doing a flush. + */ + stripeParms.stripesPerAgg = stripesPerAgg; + if (stripeParms.stripesPerAgg > numSegments) + stripeParms.stripesPerAgg = numSegments; + + for (fileSegmentIter=0;fileSegmentIter < numSegments;fileSegmentIter++) { + + int dataWrittenThisRound = 0; + + /* Define the segment range in terms of file offsets. + */ + ADIO_Offset_CA segmentFirstFileOffset = currentSegementOffset; + if ((currentSegementOffset+stripeParms.segmentLen-(ADIO_Offset_CA)1) > lastFileOffset) + currentSegementOffset = lastFileOffset; + else + currentSegementOffset += (stripeParms.segmentLen-(ADIO_Offset_CA)1); + ADIO_Offset_CA segmentLastFileOffset = currentSegementOffset; + currentSegementOffset++; + + ADIO_Offset_CA segment_stripe_offset = segmentFirstFileOffset; + for (i=0;i segment_stripe_offset) + segment_stripe_start[i] = firstFileOffset; + else + segment_stripe_start[i] = segment_stripe_offset; + if ((segment_stripe_offset + (ADIO_Offset_CA)(fs_block_info[0])) > lastFileOffset) + segment_stripe_end[i] = lastFileOffset; + else + segment_stripe_end[i] = segment_stripe_offset + (ADIO_Offset_CA)(fs_block_info[0]) - (ADIO_Offset_CA)1; + segment_stripe_offset += (ADIO_Offset_CA)(fs_block_info[0]); + } + + /* In the interest of performance for non-contiguous data with large offset lists + * essentially modify the given offset and length list appropriately for this segment + * and then pass pointers to the sections of the lists being used for this segment + * to H5FD_mpio_ccio_osagg_write. Remember how we have modified the list for this + * segment, and then restore it appropriately after processing for this segment has + * concluded, so it is ready for the next segment. + */ + int segmentContigAccessCount = 0; + int startingOffsetListIndex = -1; + int endingOffsetListIndex = -1; + ADIO_Offset_CA startingOffsetAdvancement = 0; + ADIO_Offset_CA startingLenTrim = 0; + ADIO_Offset_CA endingLenTrim = 0; + + while ( ( ( offset_list[currentOffsetListIndex] + ((ADIO_Offset_CA)(len_list[currentOffsetListIndex]))-(ADIO_Offset_CA)1 ) < segmentFirstFileOffset) + && (currentOffsetListIndex < (contig_access_count-1) ) ) + currentOffsetListIndex++; + startingOffsetListIndex = currentOffsetListIndex; + endingOffsetListIndex = currentOffsetListIndex; + int offsetInSegment = 0; + ADIO_Offset_CA offsetStart = offset_list[currentOffsetListIndex]; + ADIO_Offset_CA offsetEnd = (offset_list[currentOffsetListIndex] + ((ADIO_Offset_CA)(len_list[currentOffsetListIndex]))-(ADIO_Offset_CA)1); + + if (len_list[currentOffsetListIndex] == 0) + offsetInSegment = 0; + else if ((offsetStart >= segmentFirstFileOffset) && (offsetStart <= segmentLastFileOffset)) { + offsetInSegment = 1; + } + else if ((offsetEnd >= segmentFirstFileOffset) && (offsetEnd <= segmentLastFileOffset)) { + offsetInSegment = 1; + } + else if ((offsetStart <= segmentFirstFileOffset) && (offsetEnd >= segmentLastFileOffset)) { + offsetInSegment = 1; + } + + if (!offsetInSegment) { + segmentContigAccessCount = 0; + } + else { + /* We are in the segment, advance currentOffsetListIndex until we are out of segment. + */ + segmentContigAccessCount = 1; + + while ((offset_list[currentOffsetListIndex] <= segmentLastFileOffset) && (currentOffsetListIndex < contig_access_count)) { + dataWrittenThisRound += (int) len_list[currentOffsetListIndex]; + currentOffsetListIndex++; + } + + if (currentOffsetListIndex > startingOffsetListIndex) { + /* If we did advance, if we are at the end need to check if we are still in segment. + */ + if (currentOffsetListIndex == contig_access_count) { + currentOffsetListIndex--; + } + else if (offset_list[currentOffsetListIndex] > segmentLastFileOffset) { + /* We advanced into the last one and it still in the segment. + */ + currentOffsetListIndex--; + } + else { + dataWrittenThisRound += (int) len_list[currentOffsetListIndex]; + } + segmentContigAccessCount += (currentOffsetListIndex-startingOffsetListIndex); + endingOffsetListIndex = currentOffsetListIndex; + } + } + + if (segmentContigAccessCount > 0) { + /* Trim edges here so all data in the offset list range fits exactly in the segment. + */ + if (offset_list[startingOffsetListIndex] < segmentFirstFileOffset) { + startingOffsetAdvancement = segmentFirstFileOffset-offset_list[startingOffsetListIndex]; + offset_list[startingOffsetListIndex] += startingOffsetAdvancement; + dataWrittenThisRound -= (int) startingOffsetAdvancement; + startingLenTrim = startingOffsetAdvancement; + len_list[startingOffsetListIndex] -= startingLenTrim; + } + + if ((offset_list[endingOffsetListIndex] + ((ADIO_Offset_CA)(len_list[endingOffsetListIndex]))-(ADIO_Offset_CA)1) > segmentLastFileOffset) { + endingLenTrim = offset_list[endingOffsetListIndex]+ ((ADIO_Offset_CA)(len_list[endingOffsetListIndex]))-(ADIO_Offset_CA)1 - segmentLastFileOffset; + len_list[endingOffsetListIndex] -= endingLenTrim; + dataWrittenThisRound -= (int) endingLenTrim; + } + } + + int holeFoundThisRound = 0; + + /* Once we have packed the collective buffers do the actual write. + */ + if ((stripeParms.segmentIter == (stripeParms.stripesPerAgg-1)) || (fileSegmentIter == (numSegments-1))) { + stripeParms.flushCB = 1; + } + else + stripeParms.flushCB = 0; + + stripeParms.firstStripedIOCall = 0; + stripeParms.lastStripedIOCall = 0; + if (fileSegmentIter == 0) { + stripeParms.firstStripedIOCall = 1; + } + else if (fileSegmentIter == (numSegments-1)) + stripeParms.lastStripedIOCall = 1; + + /* The difference in calls to H5FD_mpio_ccio_osagg_write is based on the whether the buftype is + * contiguous. The algorithm tracks the position in the source buffer when called + * multiple times -- in the case of contiguous data this is simple and can be externalized with + * a buffer offset, in the case of non-contiguous data this is complex and the state must be tracked + * internally, therefore no external buffer offset. Care was taken to minimize + * H5FD_mpio_ccio_osagg_write changes at the expense of some added complexity to the caller. + */ + +#ifdef onesidedtrace + if (myrank == 0) { + int j; + printf("\n\nRank %d - Segment iteration %d stripeParms.flushCB is %d aggregator placement and assignment over %d aggs is:\n",myrank,fileSegmentIter,stripeParms.flushCB, ca_data->cb_nodes); + for (j=0;jcb_nodes;j++) + printf("Rank %d - agg rank %d writing to offset range %ld to %ld\n",myrank,ca_data->ranklist[j],segment_stripe_start[j],segment_stripe_end[j]); + printf("\n\n"); + } +#endif + + if (memFlatBuf->count == 1) { + H5FD_mpio_ccio_osagg_write(ca_data,(ADIO_Offset_CA*)&(offset_list[startingOffsetListIndex]), (ADIO_Offset_CA*)&(len_list[startingOffsetListIndex]), segmentContigAccessCount, buf+totalDataWrittenLastRound, memFlatBuf, error_code, segmentFirstFileOffset, segmentLastFileOffset, currentValidDataIndex, segment_stripe_start, segment_stripe_end, 0,&stripeParms); + } + else { + H5FD_mpio_ccio_osagg_write(ca_data,(ADIO_Offset_CA*)&(offset_list[startingOffsetListIndex]), (ADIO_Offset_CA*)&(len_list[startingOffsetListIndex]), segmentContigAccessCount, buf, memFlatBuf, error_code, segmentFirstFileOffset, segmentLastFileOffset, currentValidDataIndex, segment_stripe_start, segment_stripe_end, 0,&stripeParms); + } + + /* Async I/O - Switch between buffers */ + if(ca_data->async_io_outer) { + ca_data->use_dup = (ca_data->use_dup + 1) % 2; + } + + if (stripeParms.flushCB) { + stripeParms.segmentIter = 0; + if (stripesPerAgg > (numSegments-fileSegmentIter-1)) + stripeParms.stripesPerAgg = numSegments-fileSegmentIter-1; + else + stripeParms.stripesPerAgg = stripesPerAgg; + } + else + stripeParms.segmentIter++; + + if (holeFoundThisRound) + holeFound = 1; + + /* If we know we won't be doing a pre-read in a subsequent call to + * H5FD_mpio_ccio_osagg_write which will have a barrier to keep + * feeder ranks from doing rma to the collective buffer before the + * write completes that we told it do with the stripeParms.flushCB + * flag then we need to do a barrier here. + */ + if (!ca_data->onesided_always_rmw && stripeParms.flushCB) { + if (fileSegmentIter < (numSegments-1)) { + MPI_Barrier(ca_data->comm); + } + } + + /* Restore the offset_list and len_list to values that are ready for the + * next iteration. + */ + if (segmentContigAccessCount > 0) { + offset_list[endingOffsetListIndex] += len_list[endingOffsetListIndex]; + len_list[endingOffsetListIndex] = endingLenTrim; + } + totalDataWrittenLastRound += dataWrittenThisRound; + } /* fileSegmentIter for-loop */ + + /* Check for holes in the data unless onesided_no_rmw is set. + * If a hole is found redo the entire aggregation and write. + */ + if (!ca_data->onesided_no_rmw) { + int anyHolesFound = 0; + MPI_Allreduce(&holeFound, &anyHolesFound, 1, MPI_INT, MPI_MAX, ca_data->comm); + + if (anyHolesFound) { + H5MM_free(offset_list); + H5MM_free(len_list); + H5FD_mpio_calc_offset_list((ADIO_Offset_CA)memFlatBuf->size, fileFlatBuf, mpi_off, + &offset_list, &len_list, &start_offset, &end_offset, &contig_access_count); + + currentSegementOffset = (ADIO_Offset_CA)startingStripeWithData * (ADIO_Offset_CA)(fs_block_info[0]); + ca_data->onesided_always_rmw = 1; + ca_data->onesided_no_rmw = 1; + + /* Holes are found in the data and the user has not set + * onesided_no_rmw --- set onesided_always_rmw to 1 + * and redo the entire aggregation and write and if the user has + * onesided_inform_rmw set then inform him of this condition + * and behavior. + */ + if (ca_data->onesided_inform_rmw && (myrank ==0)) { + fprintf(stderr,"Information: Holes found during one-sided " + "write aggregation algorithm --- re-running one-sided " + "write aggregation with onesided_always_rmw set to 1.\n"); + } + } + else + doAggregation = 0; + } + else + doAggregation = 0; + + } // while doAggregation + ca_data->onesided_no_rmw = prev_onesided_no_rmw; + + H5MM_free(segment_stripe_start); + H5MM_free(segment_stripe_end); + +} /* H5FD_mpio_ccio_iterate_write */ + +/*------------------------------------------------------------------------- + * Function: H5FD_mpio_ccio_iterate_read + * + * Purpose: This function calls H5FD_mpio_ccio_osagg_read + * iteratively to perform "rounds" of one-sided collective + * data aggregation. + * + * Return: Void. + * + *------------------------------------------------------------------------- + */ + void H5FD_mpio_ccio_iterate_read(CustomAgg_FH_Data ca_data, void *buf, + int *fs_block_info, ADIO_Offset_CA *offset_list, ADIO_Offset_CA *len_list, + MPI_Offset mpi_off, int contig_access_count, int currentValidDataIndex, + ADIO_Offset_CA start_offset, ADIO_Offset_CA end_offset, + ADIO_Offset_CA firstFileOffset, ADIO_Offset_CA lastFileOffset, + H5S_flatbuf_t *memFlatBuf, H5S_flatbuf_t *fileFlatBuf, int myrank, int *error_code) +{ + + int i; + int stripesPerAgg = ca_data->cb_buffer_size / fs_block_info[0]; + int numStripedAggs = ca_data->cb_nodes; + if (stripesPerAgg == 0) { + /* The striping unit is larger than the collective buffer size + * therefore we must abort since the buffer has already been + * allocated during the open. + */ + fprintf(stderr,"Error: The collective buffer size %d is less " + "than the fs_block_size %d - This collective I/O implementation " + "cannot continue.\n",ca_data->cb_buffer_size,fs_block_info[0]); + MPI_Abort(MPI_COMM_WORLD, 1); + } + MPI_Comm_rank(ca_data->comm, &myrank); + + /* Declare ADIOI_OneSidedStripeParms here - these parameters will be locally managed + * for this invokation of H5FD_mpio_ccio_iterate_read. This will allow for concurrent + * one-sided collective writes via multi-threading as well as multiple communicators. + */ + FS_Block_Parms stripeParms; + stripeParms.stripeSize = fs_block_info[0]; /* stripe_size */ + stripeParms.stripedLastFileOffset = lastFileOffset; + stripeParms.iWasUsedStripingAgg = 0; + stripeParms.numStripesUsed = 0; + stripeParms.amountOfStripedDataExpected = 0; + stripeParms.bufTypeExtent = 0; + stripeParms.lastDataTypeExtent = 0; + stripeParms.lastFlatBufIndice = 0; + stripeParms.lastIndiceOffset = 0; + + /* The general algorithm here is to divide the file up into segments, a segment + * being defined as a contiguous region of the file which has up to `numStripedAggs` + * occurrences of each stripe - the data for each stripe being READ by a particular + * aggregator. The segmentLen is the maximum size in bytes of each segment + * (stripeSize * numStripedAggs). Here, we iteratively call + * H5FD_mpio_ccio_osagg_read for each segment to READ the data. + */ + stripeParms.segmentLen = ((ADIO_Offset_CA)numStripedAggs)*((ADIO_Offset_CA)(fs_block_info[0])); + + /* These arrays define the file offsets for the stripes for a given segment - similar + * to the concept of file domains in GPFS, essentially file domeains for the segment. + */ + ADIO_Offset_CA *segment_stripe_start = (ADIO_Offset_CA *) H5MM_malloc(numStripedAggs*sizeof(ADIO_Offset_CA)); + ADIO_Offset_CA *segment_stripe_end = (ADIO_Offset_CA *) H5MM_malloc(numStripedAggs*sizeof(ADIO_Offset_CA)); + ADIO_Offset_CA *segment_stripe_start_next; + ADIO_Offset_CA *segment_stripe_end_next; + if (ca_data->async_io_outer) { + segment_stripe_start_next = (ADIO_Offset_CA *) H5MM_malloc(numStripedAggs*sizeof(ADIO_Offset_CA)); + segment_stripe_end_next = (ADIO_Offset_CA *) H5MM_malloc(numStripedAggs*sizeof(ADIO_Offset_CA)); + } + + /* Find the actual range of stripes in the file that have data in the offset + * ranges being written -- skip holes at the front and back of the file. + */ + int currentOffsetListIndex = 0; + int fileSegmentIter = 0; + int startingStripeWithData = 0; + int foundStartingStripeWithData = 0; + while (!foundStartingStripeWithData) { + if ( ((startingStripeWithData+1) * (ADIO_Offset_CA)(fs_block_info[0])) > firstFileOffset) + foundStartingStripeWithData = 1; + else + startingStripeWithData++; + } + + /* currentSegementOffset = Offset to beginning of first stripe with data to be read */ + ADIO_Offset_CA currentSegementOffset = (ADIO_Offset_CA)startingStripeWithData * (ADIO_Offset_CA)(fs_block_info[0]); + + /* How many "rounds" of segements will we need to iterate through here */ + int numSegments = (int) ((lastFileOffset+(ADIO_Offset_CA)1 - currentSegementOffset)/stripeParms.segmentLen); + if ((lastFileOffset+(ADIO_Offset_CA)1 - currentSegementOffset)%stripeParms.segmentLen > 0) + numSegments++; + +#ifdef onesidedtrace + printf("Rank %d - H5FD_mpio_ccio_iterate_read ca_data->cb_nodes is %d numStripedAggs is %d numSegments is %d start_offset is %ld end_offset is %ld firstFileOffset is %ld lastFileOffset is %ld\n",myrank,ca_data->cb_nodes,numStripedAggs,numSegments,start_offset,end_offset,firstFileOffset,lastFileOffset); + fflush(stdout); +#endif + + /* This variable tracks how many segment stripes we have packed into the agg + * buffers so we know when the buffers are full. + */ + stripeParms.segmentIter = 0; + + /* stripeParms.stripesPerAgg is the number of stripes the aggregator must + * read to fill it's buffer. + */ + stripeParms.stripesPerAgg = stripesPerAgg; + if (stripeParms.stripesPerAgg > numSegments) + stripeParms.stripesPerAgg = numSegments; + + int totalDataReadLastRound = 0; + + /* Use 'next' read offsets for async I/O */ + ADIO_Offset_CA segmentFirstFileOffset_next, segmentLastFileOffset_next; + + /* Async I/O - Start with use_dup==0 */ + ca_data->use_dup = 0; + + /* Now, we iterate trhough all the segments that we want to read */ + for (fileSegmentIter=0;fileSegmentIter < numSegments;fileSegmentIter++) { + + int dataReadThisRound = 0; + + ADIO_Offset_CA segmentFirstFileOffset, segmentLastFileOffset; + + /* Define the segment range in terms of a file offsets. + * Just increment the offset from the previous 'currentSegementOffset' + */ + segmentFirstFileOffset = currentSegementOffset; + if ((currentSegementOffset+stripeParms.segmentLen-(ADIO_Offset_CA)1) > lastFileOffset) + currentSegementOffset = lastFileOffset; + else + currentSegementOffset += (stripeParms.segmentLen-(ADIO_Offset_CA)1); + segmentLastFileOffset = currentSegementOffset; + currentSegementOffset++; // shifting by one byte offset + + ADIO_Offset_CA segment_stripe_offset = segmentFirstFileOffset; + for (i=0;i segment_stripe_offset) + segment_stripe_start[i] = firstFileOffset; + else + segment_stripe_start[i] = segment_stripe_offset; + if ((segment_stripe_offset + (ADIO_Offset_CA)(fs_block_info[0])) > lastFileOffset) + segment_stripe_end[i] = lastFileOffset; + else + segment_stripe_end[i] = segment_stripe_offset + (ADIO_Offset_CA)(fs_block_info[0]) - (ADIO_Offset_CA)1; + segment_stripe_offset += (ADIO_Offset_CA)(fs_block_info[0]); + } + + if ((ca_data->async_io_outer) && (fileSegmentIter<(numSegments-1)) && (numSegments>1)) { + ADIO_Offset_CA cso_prev = currentSegementOffset; + segmentFirstFileOffset_next = cso_prev; + if ((cso_prev+stripeParms.segmentLen-(ADIO_Offset_CA)1) > lastFileOffset) + cso_prev = lastFileOffset; + else + cso_prev += (stripeParms.segmentLen-(ADIO_Offset_CA)1); + segmentLastFileOffset_next = cso_prev; + + ADIO_Offset_CA sso_next = segmentFirstFileOffset_next; + for (i=0;i sso_next) + segment_stripe_start_next[i] = firstFileOffset; + else + segment_stripe_start_next[i] = sso_next; + if ((sso_next + (ADIO_Offset_CA)(fs_block_info[0])) > lastFileOffset) + segment_stripe_end_next[i] = lastFileOffset; + else + segment_stripe_end_next[i] = sso_next + (ADIO_Offset_CA)(fs_block_info[0]) - (ADIO_Offset_CA)1; + sso_next += (ADIO_Offset_CA)(fs_block_info[0]); + } + + } + + /* In the interest of performance for non-contiguous data with large offset lists + * essentially modify the given offset and length list appropriately for this segment + * and then pass pointers to the sections of the lists being used for this segment + * to H5FD_mpio_ccio_osagg_read. Remember how we have modified the list for this + * segment, and then restore it appropriately after processing for this segment has + * concluded, so it is ready for the next segment. + */ + int segmentContigAccessCount = 0; + int startingOffsetListIndex = -1; + int endingOffsetListIndex = -1; + ADIO_Offset_CA startingOffsetAdvancement = 0; + ADIO_Offset_CA startingLenTrim = 0; + ADIO_Offset_CA endingLenTrim = 0; + + while ( ((offset_list[currentOffsetListIndex] + ((ADIO_Offset_CA)(len_list[currentOffsetListIndex]))-(ADIO_Offset_CA)1) < segmentFirstFileOffset) && (currentOffsetListIndex < (contig_access_count-1))) + { + currentOffsetListIndex++; + } + + startingOffsetListIndex = currentOffsetListIndex; + endingOffsetListIndex = currentOffsetListIndex; + int offsetInSegment = 0; + ADIO_Offset_CA offsetStart = offset_list[currentOffsetListIndex]; + ADIO_Offset_CA offsetEnd = (offset_list[currentOffsetListIndex] + ((ADIO_Offset_CA)(len_list[currentOffsetListIndex]))-(ADIO_Offset_CA)1); + + if (len_list[currentOffsetListIndex] == 0) + offsetInSegment = 0; + else if ((offsetStart >= segmentFirstFileOffset) && (offsetStart <= segmentLastFileOffset)) { + offsetInSegment = 1; + } + else if ((offsetEnd >= segmentFirstFileOffset) && (offsetEnd <= segmentLastFileOffset)) { + offsetInSegment = 1; + } + else if ((offsetStart <= segmentFirstFileOffset) && (offsetEnd >= segmentLastFileOffset)) { + offsetInSegment = 1; + } + + if (!offsetInSegment) { + segmentContigAccessCount = 0; + + } + else { + /* We are in the segment, advance currentOffsetListIndex until we are out of segment. + */ + segmentContigAccessCount = 1; + + while ((offset_list[currentOffsetListIndex] <= segmentLastFileOffset) && (currentOffsetListIndex < contig_access_count)) { + dataReadThisRound += (int) len_list[currentOffsetListIndex]; + currentOffsetListIndex++; + } + + if (currentOffsetListIndex > startingOffsetListIndex) { + /* If we did advance, if we are at the end need to check if we are still in segment. + */ + if (currentOffsetListIndex == contig_access_count) { + currentOffsetListIndex--; + } + else if (offset_list[currentOffsetListIndex] > segmentLastFileOffset) { + /* We advanced into the last one and it still in the segment. + */ + currentOffsetListIndex--; + } + else { + dataReadThisRound += (int) len_list[currentOffsetListIndex]; + } + segmentContigAccessCount += (currentOffsetListIndex-startingOffsetListIndex); + endingOffsetListIndex = currentOffsetListIndex; + } + } + + if (segmentContigAccessCount > 0) { + /* Trim edges here so all data in the offset list range fits exactly in the segment. + */ + if (offset_list[startingOffsetListIndex] < segmentFirstFileOffset) { + startingOffsetAdvancement = segmentFirstFileOffset-offset_list[startingOffsetListIndex]; + offset_list[startingOffsetListIndex] += startingOffsetAdvancement; + dataReadThisRound -= (int) startingOffsetAdvancement; + startingLenTrim = startingOffsetAdvancement; + len_list[startingOffsetListIndex] -= startingLenTrim; + } + + if ((offset_list[endingOffsetListIndex] + ((ADIO_Offset_CA)(len_list[endingOffsetListIndex]))-(ADIO_Offset_CA)1) > segmentLastFileOffset) { + endingLenTrim = offset_list[endingOffsetListIndex]+ ((ADIO_Offset_CA)(len_list[endingOffsetListIndex]))-(ADIO_Offset_CA)1 - segmentLastFileOffset; + len_list[endingOffsetListIndex] -= endingLenTrim; + dataReadThisRound -= (int) endingLenTrim; + } + } + + /* Once we have packed the collective buffers, set stripeParms.flushCB = 1 + * to signify this (note that stripeParms.flushCB does NOT control the actual I/O for reading) + * That is, we are reading on every call, so 'flushCB' isn't really necessary for reads + */ + if ((stripeParms.segmentIter == (stripeParms.stripesPerAgg-1)) || (fileSegmentIter == (numSegments-1))) { + stripeParms.flushCB = 1; + } + else + stripeParms.flushCB = 0; + + stripeParms.firstStripedIOCall = 0; + stripeParms.lastStripedIOCall = 0; + if (fileSegmentIter == 0) { + stripeParms.firstStripedIOCall = 1; + } + else if (fileSegmentIter == (numSegments-1)) + stripeParms.lastStripedIOCall = 1; + + /* The difference in calls to H5FD_mpio_ccio_osagg_read is based on the whether the buftype is + * contiguous. The algorithm tracks the position in the target buffer when called + * multiple times -- in the case of contiguous data this is simple and can be externalized with + * a buffer offset, in the case of non-contiguous data this is complex and the state must be tracked + * internally, therefore no external buffer offset. Care was taken to minimize + * H5FD_mpio_ccio_osagg_read changes at the expense of some added complexity to the caller. + */ + + /* Async I/O - Create a pipeline of 'reads' */ + if ((ca_data->async_io_outer) && (fileSegmentIter==0) && (numSegments>1)) { + + /* Read data from file into aggregator buffers */ + H5FD_mpio_ccio_file_read(ca_data, error_code, segmentFirstFileOffset, segmentLastFileOffset, segment_stripe_start, segment_stripe_end); + + /* Async I/O - Start prefetch of next iteration with duplite buffer */ + ca_data->use_dup = (ca_data->use_dup + 1) % 2; + + /* Read data from file into aggregator buffers for NEXT interation */ + H5FD_mpio_ccio_file_read(ca_data, error_code, segmentFirstFileOffset_next, segmentLastFileOffset_next, segment_stripe_start_next, segment_stripe_end_next); + + /* Async I/O - Switch back to current buffer */ + ca_data->use_dup = (ca_data->use_dup + 1) % 2; + + } else if ((ca_data->async_io_outer) && (fileSegmentIter<(numSegments-1)) && (numSegments>1)) { + + /* Async I/O - Start prefetch of next iteration with duplite buffer */ + ca_data->use_dup = (ca_data->use_dup + 1) % 2; + + /* Read data from file into aggregator buffers for NEXT interation */ + H5FD_mpio_ccio_file_read(ca_data, error_code, segmentFirstFileOffset_next, segmentLastFileOffset_next, segment_stripe_start_next, segment_stripe_end_next); + + /* Async I/O - Switch back to current buffer */ + ca_data->use_dup = (ca_data->use_dup + 1) % 2; + + } else if ((!ca_data->async_io_outer) || (numSegments<2)) { + + /* Read data from file into aggregator buffers */ + H5FD_mpio_ccio_file_read(ca_data, error_code, segmentFirstFileOffset, segmentLastFileOffset, segment_stripe_start, segment_stripe_end); + + } + + /* Async I/O - Wait for necessary buffer to be ready for RMA */ + if (ca_data->use_dup && ca_data->check_req_d) { + MPIO_Wait(&ca_data->io_Request_d, error_code); + ca_data->check_req_d = 0; + } else if (!ca_data->use_dup && ca_data->check_req) { + MPIO_Wait(&ca_data->io_Request, error_code); + ca_data->check_req = 0; + } + + if (memFlatBuf->count == 1) { + + /* Ranks perform one-sided read of data from collective buffers */ + H5FD_mpio_ccio_osagg_read(ca_data,(ADIO_Offset_CA*)&(offset_list[startingOffsetListIndex]), (ADIO_Offset_CA*)&(len_list[startingOffsetListIndex]), segmentContigAccessCount, buf+totalDataReadLastRound, memFlatBuf, error_code, segmentFirstFileOffset, segmentLastFileOffset, currentValidDataIndex, segment_stripe_start, segment_stripe_end, &stripeParms, 0); // Last '0' means the file read should be skipped in this call + + } else { + + /* Ranks perform one-sided read of data from collective buffers */ + H5FD_mpio_ccio_osagg_read(ca_data,(ADIO_Offset_CA*)&(offset_list[startingOffsetListIndex]), (ADIO_Offset_CA*)&(len_list[startingOffsetListIndex]), segmentContigAccessCount, buf, memFlatBuf, error_code, segmentFirstFileOffset, segmentLastFileOffset, currentValidDataIndex, segment_stripe_start, segment_stripe_end, &stripeParms, 0); // Last '0' means the file read should be skipped in this call + + } + + /* Async I/O - change 'current' buffer */ + if ((ca_data->async_io_outer) && (numSegments>1)) { + ca_data->use_dup = (ca_data->use_dup + 1) % 2; + } + + //if (stripeParms.flushCB) { + stripeParms.segmentIter = 0; + if (stripesPerAgg > (numSegments-fileSegmentIter-1)) + stripeParms.stripesPerAgg = numSegments-fileSegmentIter-1; + else + stripeParms.stripesPerAgg = stripesPerAgg; + //} + //else + // stripeParms.segmentIter++; + + /* Need barrier here. + */ + if (fileSegmentIter < (numSegments-1)) { + MPI_Barrier(ca_data->comm); + } + + /* Restore the offset_list and len_list to values that are ready for the + * next iteration. + */ + if (segmentContigAccessCount > 0) { + offset_list[endingOffsetListIndex] += len_list[endingOffsetListIndex]; + len_list[endingOffsetListIndex] = endingLenTrim; + } + totalDataReadLastRound += dataReadThisRound; + + } // fileSegmentIter for-loop + + H5MM_free(segment_stripe_start); + H5MM_free(segment_stripe_end); + if (ca_data->async_io_outer) { + H5MM_free(segment_stripe_start_next); + H5MM_free(segment_stripe_end_next); + } + +} /* End IterateOneSidedRead */ + +/*------------------------------------------------------------------------- + * Function: H5FD_mpio_nc_buffer_advance + * + * This funtion packs a contigous buffer of data from the non-contgious source + * buffer for a specified chunk of data and advances the FDSourceBufferState + * machinery, so subsequent calls with the FDSourceBufferState will return the + * next linear chunk. + * Parameters: + * in: sourceDataBuffer - pointer to source data buffer. + * in: flatBuf - pointer to flattened source data buffer + * in: targetNumBytes - number of bytes to return and advance. + * in: packing - whether data is being packed from the source buffer to the + * packed buffer (1) or unpacked from the packed buffer to the source + * buffer (0) + * in/out: currentFDSourceBufferState - pointer to FDSourceBufferState structure, current + * data used as starting point, will be updated with + * the new state after targetNumBytes advance. + * out: packedDataBufer - pointer to the output packed data buffer. If the + * value is NULL then no data will be written. + * + *------------------------------------------------------------------------- + */ +inline static void H5FD_mpio_nc_buffer_advance(char *sourceDataBuffer, + H5S_flatbuf_t *flatBuf, int targetNumBytes, int packing, + FDSourceBufferState_CA *currentFDSourceBufferState, char *packedDataBufer) +{ + /* + * Make currentDataTypeExtent and bufTypeExtent ADIO_Offset_CA since they are + * used in offset calculations + */ + ADIO_Offset_CA currentIndiceOffset = currentFDSourceBufferState->indiceOffset; + ADIO_Offset_CA bufTypeExtent = (ADIO_Offset_CA)currentFDSourceBufferState->bufTypeExtent; + ADIO_Offset_CA currentDataTypeExtent = currentFDSourceBufferState->dataTypeExtent; + int currentFlatBufIndice = currentFDSourceBufferState->flatBufIndice; + int targetSendDataIndex = 0; + +#ifdef onesidedtrace + printf("H5FD_mpio_nc_buffer_advance: currentFlatBufIndice is %d currentDataTypeExtent is %ld currentIndiceOffset is %ld\n",currentFlatBufIndice,currentDataTypeExtent,currentIndiceOffset); +#endif + + int remainingBytesToLoad = targetNumBytes; + while (remainingBytesToLoad > 0) { + if ((flatBuf->blocklens[currentFlatBufIndice] - currentIndiceOffset) >= remainingBytesToLoad) { // we can get the rest of our data from this indice + ADIO_Offset_CA physicalSourceBufferOffset = (currentDataTypeExtent * bufTypeExtent) + flatBuf->indices[currentFlatBufIndice] + currentIndiceOffset; + +#ifdef onesidedtrace + printf("loading remainingBytesToLoad %d from src buffer offset %ld to targetSendDataIndex %d\n",remainingBytesToLoad,physicalSourceBufferOffset,targetSendDataIndex); +#endif + + if (packedDataBufer != NULL) { + if (packing) + memcpy(&(packedDataBufer[targetSendDataIndex]),&(sourceDataBuffer[physicalSourceBufferOffset]),remainingBytesToLoad); + else + memcpy(&(sourceDataBuffer[physicalSourceBufferOffset]),&(packedDataBufer[targetSendDataIndex]),remainingBytesToLoad); + } + + targetSendDataIndex += remainingBytesToLoad; + currentIndiceOffset += (ADIO_Offset_CA)remainingBytesToLoad; + if (currentIndiceOffset >= flatBuf->blocklens[currentFlatBufIndice]) { + currentIndiceOffset = (ADIO_Offset_CA)0; + currentFlatBufIndice++; + if (currentFlatBufIndice == flatBuf->count) { + currentFlatBufIndice = 0; + currentDataTypeExtent++; + } + } + remainingBytesToLoad = 0; + + } + else { // we can only get part of our data from this indice + ADIO_Offset_CA amountDataToLoad = (flatBuf->blocklens[currentFlatBufIndice] - currentIndiceOffset); + ADIO_Offset_CA physicalSourceBufferOffset = (currentDataTypeExtent * bufTypeExtent) + flatBuf->indices[currentFlatBufIndice] + currentIndiceOffset; + +#ifdef onesidedtrace + printf("loading amountDataToLoad %d from src buffer offset %ld to targetSendDataIndex %d\n",amountDataToLoad,physicalSourceBufferOffset,targetSendDataIndex); +#endif + if (packedDataBufer != NULL) { + if (packing) + memcpy(&(packedDataBufer[targetSendDataIndex]),&(sourceDataBuffer[physicalSourceBufferOffset]),amountDataToLoad); + else + memcpy(&(sourceDataBuffer[physicalSourceBufferOffset]),&(packedDataBufer[targetSendDataIndex]),amountDataToLoad); + } + + targetSendDataIndex += amountDataToLoad; + currentIndiceOffset = (ADIO_Offset_CA)0; + currentFlatBufIndice++; + if (currentFlatBufIndice == flatBuf->count) { + currentFlatBufIndice = 0; + currentDataTypeExtent++; + } + remainingBytesToLoad -= amountDataToLoad; + } + } // while + + /* + * Update machinery with new flatbuf position + */ + currentFDSourceBufferState->indiceOffset = currentIndiceOffset; + currentFDSourceBufferState->dataTypeExtent = currentDataTypeExtent; + currentFDSourceBufferState->flatBufIndice = currentFlatBufIndice; +#ifdef onesidedtrace + printf("source buf advanced to currentFlatBufIndice %d currentDataTypeExtent %ld currentIndiceOffset %ld\n",currentFlatBufIndice,currentDataTypeExtent,currentIndiceOffset); +#endif +}; /* H5FD_mpio_nc_buffer_advance */ + +/*------------------------------------------------------------------------- + * Function: H5FD_mpio_ccio_osagg_write + * + * Purpose: + * + * The H5FD_mpio_ccio_osagg_write algorithm is called once + * for each segment of data, a segment being defined as a contiguous region of the file which + * is the size of one striping unit times the number of aggregators. For lustre the striping unit + * corresponds with the actual file stripe, in the case of gpfs these are file domains. + * Each call effectively packs one striping unit of data into the collective buffer on each agg, + * with additional parameters which govern when to flush the collective buffer to the file. + * Therefore in practice the collective write call for a file system such as + * lustre on a dataset composed of multiple segments would call the algorithm several times without a + * flush parameter to fill the collective buffers with multiple stripes of data, before calling it again to flush + * the collective buffer to the file system. In this fashion the synchronization can be minimized as that + * only needs to occur during the actual read from or write to the file system. In the case of gpfs + * this function is called just once. The FS_Block_Parms parameter is used to save the + * state and re-use variables thru repetative calls to help in the case of lustre to avoid costly + * recomputation, for consistency gpfs utilizes it as well but doesn't use some aspects of it. This + * function was originally first written for gpfs only and then modified to support lustre. + * + *------------------------------------------------------------------------- + */ +void H5FD_mpio_ccio_osagg_write(CustomAgg_FH_Data ca_data, + ADIO_Offset_CA *offset_list, + ADIO_Offset_CA *len_list, + int contig_access_count, + const void *buf, + H5S_flatbuf_t *memFlatBuf, + int *error_code, + ADIO_Offset_CA firstFileOffset, + ADIO_Offset_CA lastFileOffset, + int numNonZeroDataOffsets, + ADIO_Offset_CA *fd_start, + ADIO_Offset_CA* fd_end, + int hole_found, + FS_Block_Parms *stripe_parms) + +{ + int i,j; /* generic iterators */ + + /* + * Make local copy of certain ADIOI_OneSidedStripeParms elements for + * faster access - pay for pointer dereference only once. + */ + int stripeSize = stripe_parms->stripeSize; + int segmentIter = stripe_parms->segmentIter; + hsize_t bufTypeExtent = stripe_parms->bufTypeExtent; + + if ((stripeSize > 0) && stripe_parms->firstStripedIOCall) + stripe_parms->iWasUsedStripingAgg = 0; + +#ifdef onesidedtrace + if (buf == NULL) { + printf("H5FD_mpio_ccio_osagg_write - buf is NULL contig_access_count is %d\n",contig_access_count); + for (i=0;i 0) + lenListOverZero = 1; + + *error_code = MPI_SUCCESS; /* initialize to success */ + + MPI_Status status; + int nprocs,myrank; + MPI_Comm_size(ca_data->comm, &nprocs); + MPI_Comm_rank(ca_data->comm, &myrank); + +#ifdef onesidedtrace + printf("Rank %d - H5FD_mpio_ccio_osagg_write started\n",myrank); +#endif + + if (ca_data->io_buf_window == MPI_WIN_NULL || ca_data->io_buf_put_amounts_window == MPI_WIN_NULL) + { + HDF5_ccio_win_setup(ca_data, nprocs); + } + + /* + * This flag denotes whether the source datatype is contiguous, which is referenced throughout the algorithm + * and defines how the source buffer offsets and data chunks are determined. If the value is 1 (true - contiguous data) + * things are profoundly simpler in that the source buffer offset for a given target offset simply linearly increases + * by the chunk sizes being written. If the value is 0 (non-contiguous) then these values are based on calculations + * from the flattened source datatype. + */ + int bufTypeIsContig; + if (memFlatBuf->count == 1) + bufTypeIsContig = 1; + else + bufTypeIsContig = 0; + + if (!bufTypeIsContig) { + /* For a non-contiguous source buffer set the extent. */ + if ((stripeSize == 0) || stripe_parms->firstStripedIOCall) { + bufTypeExtent = memFlatBuf->extent; + } + +#ifdef onesidedtrace + printf("Rank %d - memFlatBuf->count is %d bufTypeExtent is %ld\n",myrank,memFlatBuf->count, bufTypeExtent); + for (i=0;icount;i++) + printf("Rank %d - memFlatBuf->blocklens[%d] is %d memFlatBuf->indices[%d] is %ld\n",myrank,i,memFlatBuf->blocklens[i],i,memFlatBuf->indices[i]); +#endif + } + + int naggs = ca_data->cb_nodes; + + /* Track the state of the source buffer for feeding the target data blocks. + * For GPFS the number of file domains per agg is always 1 so we just need 1 agg + * dimension to track the data, in the case of lustre we will need 2 dimensions + * agg and file domain since aggs write to multiple file domains in the case of lustre. + * This structure will be modified as the data is written to reflect the current state + * of the offset. + */ + +#ifdef onesidedtrace + printf("Rank %d - sizeof(FDSourceBufferState_CA) is %d - make sure is 32 for 32-byte memalign optimal\n",myrank,sizeof(FDSourceBufferState_CA)); +#endif + + FDSourceBufferState_CA *currentFDSourceBufferState = (FDSourceBufferState_CA *) H5MM_malloc(naggs * sizeof(FDSourceBufferState_CA)); + + for (i=0;icb_buffer_size); + + /* This logic defines values that are used later to determine what offsets define the portion + * of the file domain the agg is writing this round. + */ + int greatestFileDomainAggRank = -1,smallestFileDomainAggRank = -1; + ADIO_Offset_CA greatestFileDomainOffset = 0; + ADIO_Offset_CA smallestFileDomainOffset = lastFileOffset; + for (j=0;j greatestFileDomainOffset) { + greatestFileDomainOffset = fd_end[j]; + greatestFileDomainAggRank = j; + } + if (fd_start[j] < smallestFileDomainOffset) { + smallestFileDomainOffset = fd_start[j]; + smallestFileDomainAggRank = j; + } + if (ca_data->ranklist[j] == myrank) { + myAggRank = j; + if (fd_end[j] > fd_start[j]) { + iAmUsedAgg = 1; + stripe_parms->iWasUsedStripingAgg = 1; + } + } + } + +#ifdef onesidedtrace + printf("Rank %d - contig_access_count is %d lastFileOffset is %ld firstFileOffset is %ld\n",myrank,contig_access_count,lastFileOffset,firstFileOffset); + for (j=0;j numberOfRounds) + numberOfRounds = currentNumberOfRounds; + } + + /* Data structures to track what data this compute needs to send to whom. + * For lustre they will all need another dimension for the file domain. + */ + int *targetAggsForMyData = (int *)H5MM_malloc(naggs * sizeof(int)); + ADIO_Offset_CA *targetAggsForMyDataFDStart = (ADIO_Offset_CA *)H5MM_malloc(naggs * sizeof(ADIO_Offset_CA)); + ADIO_Offset_CA *targetAggsForMyDataFDEnd = (ADIO_Offset_CA *)H5MM_malloc(naggs * sizeof(ADIO_Offset_CA)); + int numTargetAggs = 0; + + /* This data structure holds the beginning offset and len list index for the range to be written + * coresponding to the round and target agg. Initialize to -1 to denote being unset. + */ + int **targetAggsForMyDataFirstOffLenIndex = (int **)H5MM_malloc(numberOfRounds * sizeof(int *)); + for (i=0;iranklist[%d] is %d fd_start is %ld fd_end is %ld\n",myrank,i,ca_data->ranklist[i],fd_start[i],fd_end[i]); + for (j=0;j 0) && !stripe_parms->firstStripedIOCall) { + currentDataTypeExtent = stripe_parms->lastDataTypeExtent; + currentFlatBufIndice = stripe_parms->lastFlatBufIndice; + currentIndiceOffset = stripe_parms->lastIndiceOffset; +#ifdef onesidedtrace + printf("Rank %d - using stripe_parms->lastDataTypeExtent %ld stripe_parms->lastFlatBufIndice %d stripe_parms->lastIndiceOffset %ld\n", + myrank,stripe_parms->lastDataTypeExtent,stripe_parms->lastFlatBufIndice,stripe_parms->lastIndiceOffset); +#endif + } + + /* This denotes the coll_bufsize boundaries within the source buffer for writing for the same round. + */ + ADIO_Offset_CA intraRoundCollBufsizeOffset = 0; + + /* This data structure tracks what target aggs need to be written to on what rounds. + */ + int *targetAggsForMyDataCurrentRoundIter = (int *)H5MM_malloc(naggs * sizeof(int)); + for (i=0;i 0) && (buf != NULL) && lenListOverZero) { + int blockIter; + for (blockIter=0;blockIter0) { + if (bufTypeIsContig) { + currentSourceBufferOffset += len_list[blockIter-1]; + } + else { + + /* Non-contiguous source datatype, count up the extents and indices to this point + * in the blocks for use in computing the source starting buffer offset for target aggs + * and file domains. + */ + ADIO_Offset_CA sourceBlockTotal = 0; + int lastIndiceUsed = currentFlatBufIndice; + int numNonContigSourceChunks = 0; + + while (sourceBlockTotal < len_list[blockIter-1]) { + numNonContigSourceChunks++; + sourceBlockTotal += (memFlatBuf->blocklens[currentFlatBufIndice] - currentIndiceOffset); + lastIndiceUsed = currentFlatBufIndice; + currentFlatBufIndice++; + if (currentFlatBufIndice == memFlatBuf->count) { + currentFlatBufIndice = 0; + currentDataTypeExtent++; + } + currentIndiceOffset = (ADIO_Offset_CA)0; + } + if (sourceBlockTotal > len_list[blockIter-1]) { + currentFlatBufIndice--; + if (currentFlatBufIndice < 0 ) { + currentDataTypeExtent--; + currentFlatBufIndice = memFlatBuf->count-1; + } + currentIndiceOffset = len_list[blockIter-1] - (sourceBlockTotal - memFlatBuf->blocklens[lastIndiceUsed]); + } + else + currentIndiceOffset = (ADIO_Offset_CA)0; + maxNumContigOperations += (numNonContigSourceChunks+2); + if (numNonContigSourceChunks > maxNumNonContigSourceChunks) + maxNumNonContigSourceChunks = numNonContigSourceChunks; + +#ifdef onesidedtrace + printf("blockiter %d currentFlatBufIndice is now %d currentDataTypeExtent is now %ld currentIndiceOffset is now %ld maxNumContigOperations is now %d\n",blockIter,currentFlatBufIndice,currentDataTypeExtent,currentIndiceOffset,maxNumContigOperations); +#endif + + } // !bufTypeIsContig + } // blockIter > 0 + + /* For the last iteration we need to include these maxNumContigOperations and maxNumNonContigSourceChunks + * for non-contig case even though we did not need to compute the next starting offset. + */ + if ((blockIter == (contig_access_count-1)) && (!bufTypeIsContig)) { + ADIO_Offset_CA sourceBlockTotal = 0; + int tmpCurrentFlatBufIndice = currentFlatBufIndice; + int lastNumNonContigSourceChunks = 0; + while (sourceBlockTotal < len_list[blockIter]) { + lastNumNonContigSourceChunks++; + sourceBlockTotal += memFlatBuf->blocklens[tmpCurrentFlatBufIndice]; + tmpCurrentFlatBufIndice++; + if (tmpCurrentFlatBufIndice == memFlatBuf->count) { + tmpCurrentFlatBufIndice = 0; + } + } + maxNumContigOperations += (lastNumNonContigSourceChunks+2); + if (lastNumNonContigSourceChunks > maxNumNonContigSourceChunks) + maxNumNonContigSourceChunks = lastNumNonContigSourceChunks; + + } + + ADIO_Offset_CA blockStart = offset_list[blockIter], blockEnd = offset_list[blockIter]+len_list[blockIter]-(ADIO_Offset_CA)1; + + /* Find the starting target agg for this block - normally it will be the current agg so guard the expensive + * while loop with a cheap if-check which for large numbers of small blocks will usually be false. + */ + if (!((blockStart >= fd_start[currentAggRankListIndex]) && (blockStart <= fd_end[currentAggRankListIndex]))) { + while (!((blockStart >= fd_start[currentAggRankListIndex]) && (blockStart <= fd_end[currentAggRankListIndex]))) + currentAggRankListIndex++; + }; + +#ifdef onesidedtrace + printf("Rank %d - currentAggRankListIndex is %d blockStart %ld blockEnd %ld fd_start[currentAggRankListIndex] %ld fd_end[currentAggRankListIndex] %ld\n",myrank,currentAggRankListIndex,blockStart,blockEnd,fd_start[currentAggRankListIndex],fd_end[currentAggRankListIndex]); +#endif + + /* Determine if this is a new target agg. + */ + if (blockIter>0) { + if ((offset_list[blockIter-1]+len_list[blockIter-1]-(ADIO_Offset_CA)1) < fd_start[currentAggRankListIndex]) { + numTargetAggs++; + } + } + + /* Determine which round to start writing - data is written coll_bufsize per round from the aggregator + * so if our starting offset in the file domain is multiple coll_bufsize that will correspond to the round. + */ + if ((blockStart - fd_start[currentAggRankListIndex]) >= coll_bufsize) { + ADIO_Offset_CA currentRoundBlockStart = fd_start[currentAggRankListIndex]; + int startingRound = 0; + while (blockStart > (currentRoundBlockStart + coll_bufsize - (ADIO_Offset_CA)1)) { + currentRoundBlockStart+=coll_bufsize; + startingRound++; + } + targetAggsForMyDataCurrentRoundIter[numTargetAggs] = startingRound; + } + + /* Initialize the data structures if this is the first offset in the round/target agg. + */ + if (targetAggsForMyDataFirstOffLenIndex[targetAggsForMyDataCurrentRoundIter[numTargetAggs]][numTargetAggs] == -1) { + targetAggsForMyData[numTargetAggs] = ca_data->ranklist[currentAggRankListIndex]; + targetAggsForMyDataFDStart[numTargetAggs] = fd_start[currentAggRankListIndex]; + /* Round up file domain to the first actual offset used if this is the first file domain. + */ + if (currentAggRankListIndex == smallestFileDomainAggRank) { + if (targetAggsForMyDataFDStart[numTargetAggs] < firstFileOffset) + targetAggsForMyDataFDStart[numTargetAggs] = firstFileOffset; + } + targetAggsForMyDataFDEnd[numTargetAggs] = fd_end[currentAggRankListIndex]; + /* Round down file domain to the last actual offset used if this is the last file domain. + */ + if (currentAggRankListIndex == greatestFileDomainAggRank) { + if (targetAggsForMyDataFDEnd[numTargetAggs] > lastFileOffset) + targetAggsForMyDataFDEnd[numTargetAggs] = lastFileOffset; + } + targetAggsForMyDataFirstOffLenIndex[targetAggsForMyDataCurrentRoundIter[numTargetAggs]][numTargetAggs] = blockIter; + /* Set the source buffer state starting point for data access for this agg and file domain. */ + + if (bufTypeIsContig) { + if (currentFDSourceBufferState[numTargetAggs].sourceBufferOffset == -1) { + currentFDSourceBufferState[numTargetAggs].sourceBufferOffset = currentSourceBufferOffset; +#ifdef onesidedtrace + printf("Rank %d - For agg %d sourceBufferOffset initialized to %ld\n",myrank,currentAggRankListIndex,currentSourceBufferOffset); +#endif + } + } + else { + if (currentFDSourceBufferState[numTargetAggs].indiceOffset == -1) { + currentFDSourceBufferState[numTargetAggs].indiceOffset = currentIndiceOffset; + currentFDSourceBufferState[numTargetAggs].bufTypeExtent = bufTypeExtent; + currentFDSourceBufferState[numTargetAggs].dataTypeExtent = currentDataTypeExtent; + currentFDSourceBufferState[numTargetAggs].flatBufIndice = currentFlatBufIndice; +#ifdef onesidedtrace + printf("Rank %d - For agg %d dataTypeExtent initialized to %ld flatBufIndice to %d indiceOffset to %ld\n",myrank,numTargetAggs,currentDataTypeExtent,currentFlatBufIndice,currentIndiceOffset); +#endif + } + } + + intraRoundCollBufsizeOffset = fd_start[currentAggRankListIndex] + ((ADIO_Offset_CA)(targetAggsForMyDataCurrentRoundIter[numTargetAggs]+1) * coll_bufsize); + +#ifdef onesidedtrace + printf("Rank %d - Initial settings numTargetAggs %d offset_list[%d] with value %ld past fd border %ld with len %ld currentSourceBufferOffset set to %ld intraRoundCollBufsizeOffset set to %ld\n",myrank,numTargetAggs,blockIter,offset_list[blockIter],fd_start[currentAggRankListIndex],len_list[blockIter],currentSourceBufferOffset,intraRoundCollBufsizeOffset); +#endif + } + + /* Replace the last offset block iter with this one. + */ + targetAggsForMyDataLastOffLenIndex[targetAggsForMyDataCurrentRoundIter[numTargetAggs]][numTargetAggs] = blockIter; + + /* If this blocks extends into the next file domain advance to the next target aggs and source buffer states. + */ + if (blockEnd > fd_end[currentAggRankListIndex]) { + + ADIO_Offset_CA amountToAdvanceSBOffsetForFD = 0; + int additionalFDCounter = 0; + + while (blockEnd > fd_end[currentAggRankListIndex]) { +#ifdef onesidedtrace + printf("Rank %d - block extends past current fd, blockEnd %ld >= fd_end[currentAggRankListIndex] %ld total block size is %ld blockStart was %ld\n",myrank,blockEnd,fd_end[currentAggRankListIndex], len_list[blockIter],blockStart); +#endif + ADIO_Offset_CA thisAggBlockEnd = fd_end[currentAggRankListIndex]; + if (thisAggBlockEnd >= intraRoundCollBufsizeOffset) { + while (thisAggBlockEnd >= intraRoundCollBufsizeOffset) { + targetAggsForMyDataCurrentRoundIter[numTargetAggs]++; + intraRoundCollBufsizeOffset += coll_bufsize; + targetAggsForMyDataFirstOffLenIndex[targetAggsForMyDataCurrentRoundIter[numTargetAggs]][numTargetAggs] = blockIter; + targetAggsForMyDataLastOffLenIndex[targetAggsForMyDataCurrentRoundIter[numTargetAggs]][numTargetAggs] = blockIter; +#ifdef onesidedtrace + printf("Rank %d - targetAggsForMyDataCurrentRoundI%d] is now %d intraRoundCollBufsizeOffset is now %ld\n",myrank,numTargetAggs,targetAggsForMyDataCurrentRoundIter[numTargetAggs],intraRoundCollBufsizeOffset); +#endif + } // while (thisAggBlockEnd >= intraRoundCollBufsizeOffset) + } // if (thisAggBlockEnd >= intraRoundCollBufsizeOffset) + + int prevAggRankListIndex = currentAggRankListIndex; + currentAggRankListIndex++; + + /* Skip over unused aggs. + */ + if (fd_start[currentAggRankListIndex] > fd_end[currentAggRankListIndex]) { + while (fd_start[currentAggRankListIndex] > fd_end[currentAggRankListIndex]) + currentAggRankListIndex++; + } // (fd_start[currentAggRankListIndex] > fd_end[currentAggRankListIndex]) + + /* Start new target agg. + */ + if (blockEnd >= fd_start[currentAggRankListIndex]) { + numTargetAggs++; + targetAggsForMyData[numTargetAggs] = ca_data->ranklist[currentAggRankListIndex]; + targetAggsForMyDataFDStart[numTargetAggs] = fd_start[currentAggRankListIndex]; + /* Round up file domain to the first actual offset used if this is the first file domain. + */ + if (currentAggRankListIndex == smallestFileDomainAggRank) { + if (targetAggsForMyDataFDStart[numTargetAggs] < firstFileOffset) + targetAggsForMyDataFDStart[numTargetAggs] = firstFileOffset; + } + targetAggsForMyDataFDEnd[numTargetAggs] = fd_end[currentAggRankListIndex]; + /* Round down file domain to the last actual offset used if this is the last file domain. + */ + if (currentAggRankListIndex == greatestFileDomainAggRank) { + if (targetAggsForMyDataFDEnd[numTargetAggs] > lastFileOffset) + targetAggsForMyDataFDEnd[numTargetAggs] = lastFileOffset; + } + targetAggsForMyDataFirstOffLenIndex[targetAggsForMyDataCurrentRoundIter[numTargetAggs]][numTargetAggs] = blockIter; + /* For the first additonal file domain the source buffer offset + * will be incremented relative to the state of this first main + * loop but for subsequent full file domains the offset will be + * incremented by the size + * of the file domain. + */ + if (additionalFDCounter == 0) + amountToAdvanceSBOffsetForFD = (fd_end[prevAggRankListIndex] - blockStart) + (ADIO_Offset_CA)1; + else + amountToAdvanceSBOffsetForFD = (fd_end[prevAggRankListIndex] - fd_start[prevAggRankListIndex]) +(ADIO_Offset_CA)1; + + if (bufTypeIsContig) { + HDassert(numTargetAggs > 0); + if (currentFDSourceBufferState[numTargetAggs].sourceBufferOffset == -1) { + if (additionalFDCounter == 0) { // first file domain, still use the current data counter + currentFDSourceBufferState[numTargetAggs].sourceBufferOffset = + currentSourceBufferOffset+amountToAdvanceSBOffsetForFD; + } + else { // 2nd file domain, advance full file domain from last source buffer state + currentFDSourceBufferState[numTargetAggs].sourceBufferOffset = + currentFDSourceBufferState[numTargetAggs-1].sourceBufferOffset+amountToAdvanceSBOffsetForFD; + } +#ifdef onesidedtrace + printf("Rank %d - Crossed into new FD - for agg %d sourceBufferOffset initialized to %ld amountToAdvanceSBOffsetForFD is %ld\n",myrank,numTargetAggs,currentFDSourceBufferState[numTargetAggs].sourceBufferOffset,amountToAdvanceSBOffsetForFD); +#endif + } + } + else if (currentFDSourceBufferState[numTargetAggs].indiceOffset == -1) { + /* non-contiguos source buffer */ + HDassert(numTargetAggs > 0); + + /* Initialize the source buffer state appropriately and then + * advance it with the + * H5FD_mpio_nc_buffer_advance function. + */ + if (additionalFDCounter == 0) { + // first file domain, still use the current data counter + currentFDSourceBufferState[numTargetAggs].indiceOffset = + currentIndiceOffset; + currentFDSourceBufferState[numTargetAggs].bufTypeExtent = bufTypeExtent; + currentFDSourceBufferState[numTargetAggs].dataTypeExtent = + currentDataTypeExtent; + currentFDSourceBufferState[numTargetAggs].flatBufIndice = + currentFlatBufIndice; + } + else { + // 2nd file domain, advance full file domain from last source buffer state + currentFDSourceBufferState[numTargetAggs].indiceOffset = + currentFDSourceBufferState[numTargetAggs-1].indiceOffset; + currentFDSourceBufferState[numTargetAggs].bufTypeExtent = + currentFDSourceBufferState[numTargetAggs-1].bufTypeExtent; + currentFDSourceBufferState[numTargetAggs].dataTypeExtent = + currentFDSourceBufferState[numTargetAggs-1].dataTypeExtent; + currentFDSourceBufferState[numTargetAggs].flatBufIndice = + currentFDSourceBufferState[numTargetAggs-1].flatBufIndice; + } + H5FD_mpio_nc_buffer_advance(((char*)buf), memFlatBuf, + (int)amountToAdvanceSBOffsetForFD, 1, + ¤tFDSourceBufferState[numTargetAggs], NULL); +#ifdef onesidedtrace + printf("Rank %d - Crossed into new FD - for agg %d dataTypeExtent initialized to %ld flatBufIndice to %d indiceOffset to %ld amountToAdvanceSBOffsetForFD is %d\n",myrank,numTargetAggs,currentFDSourceBufferState[numTargetAggs].dataTypeExtent,currentFDSourceBufferState[numTargetAggs].flatBufIndice,currentFDSourceBufferState[numTargetAggs].indiceOffset,amountToAdvanceSBOffsetForFD); +#endif + } + additionalFDCounter++; + +#ifdef onesidedtrace + printf("Rank %d - block extended beyond fd init settings numTargetAggs %d offset_list[%d] with value %ld past fd border %ld with len %ld\n",myrank,numTargetAggs,blockIter,offset_list[blockIter],fd_start[currentAggRankListIndex],len_list[blockIter]); +#endif + intraRoundCollBufsizeOffset = fd_start[currentAggRankListIndex] + coll_bufsize; + targetAggsForMyDataLastOffLenIndex[targetAggsForMyDataCurrentRoundIter[numTargetAggs]][numTargetAggs] = blockIter; + + } // if (blockEnd >= fd_start[currentAggRankListIndex]) + } // while (blockEnd > fd_end[currentAggRankListIndex]) + } // if (blockEnd > fd_end[currentAggRankListIndex]) + + /* If we are still in the same file domain / target agg but have gone + * past the coll_bufsize and need to advance to the next round - + * initialize tracking data appropriately. + */ + if (blockEnd >= intraRoundCollBufsizeOffset) { + ADIO_Offset_CA currentBlockEnd = blockEnd; + while (currentBlockEnd >= intraRoundCollBufsizeOffset) { + targetAggsForMyDataCurrentRoundIter[numTargetAggs]++; + intraRoundCollBufsizeOffset += coll_bufsize; + targetAggsForMyDataFirstOffLenIndex[targetAggsForMyDataCurrentRoundIter[numTargetAggs]][numTargetAggs] = blockIter; + targetAggsForMyDataLastOffLenIndex[targetAggsForMyDataCurrentRoundIter[numTargetAggs]][numTargetAggs] = blockIter; +#ifdef onesidedtrace + printf("Rank %d - smaller than fd currentBlockEnd is now %ld intraRoundCollBufsizeOffset is now %ld targetAggsForMyDataCurrentRoundIter[%d] is now %d\n",myrank,currentBlockEnd, intraRoundCollBufsizeOffset, numTargetAggs,targetAggsForMyDataCurrentRoundIter[numTargetAggs]); +#endif + } // while (currentBlockEnd >= intraRoundCollBufsizeOffset) + } // if (blockEnd >= intraRoundCollBufsizeOffset) + + /* Need to advance numTargetAggs if this is the last target offset to + * include this one. + */ + if (blockIter == (contig_access_count-1)) + numTargetAggs++; + } + +#ifdef onesidedtrace + printf("Rank %d - numTargetAggs is %d\n",myrank,numTargetAggs); + for (i=0;i 0) loop...\n",myrank); + fflush(stdout); +#endif + + } // if ((contig_access_count > 0) && (buf != NULL) && lenListOverZero) + +#ifdef onesidedtrace + printf("Rank %d - Done with (contig_access_count > 0) loop.\n",myrank); + fflush(stdout); +#endif + + H5MM_free(targetAggsForMyDataCurrentRoundIter); + + +#ifdef onesidedtrace + printf("Rank %d - targetAggsForMyDataCurrentRoundIter freed.\n",myrank); + fflush(stdout); +#endif + + /* use the write buffer allocated in the file_open */ + char *write_buf; + MPI_Win write_buf_window; + if(!ca_data->onesided_no_rmw) { + hole_found = 0; + } + + /* Async I/O - Adjust if this is the "duplicate" buffer */ + if (ca_data->use_dup) { + write_buf = ca_data->io_buf_d; + write_buf_window = ca_data->io_buf_window_d; + } else { + write_buf = ca_data->io_buf; + write_buf_window = ca_data->io_buf_window; + } + +#ifdef onesidedtrace + printf("Rank %d - write_buf and write_buf_window set.\n",myrank); + fflush(stdout); +#endif + + /* Counters to track the offset range being written by the used aggs. + */ + ADIO_Offset_CA currentRoundFDStart = 0; + ADIO_Offset_CA currentRoundFDEnd = 0; + + if (iAmUsedAgg) { + currentRoundFDStart = fd_start[myAggRank]; + currentRoundFDEnd = fd_end[myAggRank]; + if (myAggRank == smallestFileDomainAggRank) { + if (currentRoundFDStart < firstFileOffset) + currentRoundFDStart = firstFileOffset; + } + else if (myAggRank == greatestFileDomainAggRank) { + if (currentRoundFDEnd > lastFileOffset) + currentRoundFDEnd = lastFileOffset; + } +#ifdef onesidedtrace + printf("Rank %d - iAmUsedAgg - currentRoundFDStart initialized to %ld currentRoundFDEnd to %ld\n",myrank,currentRoundFDStart,currentRoundFDEnd); +#endif + + if ((stripeSize > 0) && (segmentIter == 0)) { + stripe_parms->numStripesUsed = 0; + stripe_parms->stripeIOoffsets = (MPI_Offset *) H5MM_malloc(stripe_parms->stripesPerAgg*sizeof(MPI_Offset)); + stripe_parms->stripeIOLens = (int *) H5MM_malloc(stripe_parms->stripesPerAgg*sizeof(int)); + stripe_parms->amountOfStripedDataExpected = 0; + int stripeIter = 0; + for (stripeIter=0;stripeIterstripesPerAgg;stripeIter++) { + if (stripeIter == 0) { + stripe_parms->stripeIOoffsets[stripeIter] = currentRoundFDStart; + stripe_parms->stripeIOLens[stripeIter] = (int)(currentRoundFDEnd - currentRoundFDStart)+1; + stripe_parms->amountOfStripedDataExpected += (int)(currentRoundFDEnd - currentRoundFDStart)+1; + stripe_parms->numStripesUsed++; + } + else { + if (((currentRoundFDEnd + (ADIO_Offset_CA)1 + ((ADIO_Offset_CA)stripeIter * stripe_parms->segmentLen))) > stripe_parms->stripedLastFileOffset) { + if (((currentRoundFDEnd + (ADIO_Offset_CA)1 - (ADIO_Offset_CA)(stripe_parms->stripeSize) + ((ADIO_Offset_CA)stripeIter * stripe_parms->segmentLen))) <= stripe_parms->stripedLastFileOffset) { + stripe_parms->stripeIOoffsets[stripeIter] = (currentRoundFDEnd + (ADIO_Offset_CA)1) - (ADIO_Offset_CA)(stripe_parms->stripeSize) + ((ADIO_Offset_CA)stripeIter * stripe_parms->segmentLen); + stripe_parms->stripeIOLens[stripeIter] = (int)(stripe_parms->stripedLastFileOffset - (currentRoundFDEnd + (ADIO_Offset_CA)1 - (ADIO_Offset_CA)(stripe_parms->stripeSize) + ((ADIO_Offset_CA)stripeIter * stripe_parms->segmentLen)) + (ADIO_Offset_CA)1); + stripe_parms->amountOfStripedDataExpected += (int)(stripe_parms->stripedLastFileOffset - (currentRoundFDEnd + (ADIO_Offset_CA)1 - (ADIO_Offset_CA)(stripe_parms->stripeSize) + ((ADIO_Offset_CA)stripeIter * stripe_parms->segmentLen)) + (ADIO_Offset_CA)1); + stripe_parms->numStripesUsed++; + } + } + else { + stripe_parms->stripeIOoffsets[stripeIter] = (currentRoundFDEnd + (ADIO_Offset_CA)1) - (ADIO_Offset_CA)(stripe_parms->stripeSize) + ((ADIO_Offset_CA)stripeIter * stripe_parms->segmentLen); + stripe_parms->stripeIOLens[stripeIter] = stripe_parms->stripeSize; + stripe_parms->amountOfStripedDataExpected += stripe_parms->stripeSize; + stripe_parms->numStripesUsed++; + } + } + } // for-loop +#ifdef onesidedtrace + printf("Rank %d - stripe_parms->amountOfStripedDataExpected is %d stripe_parms->numStripesUsed is %d offsets and lengths are ",myrank,stripe_parms->amountOfStripedDataExpected,stripe_parms->numStripesUsed); + for (i=0;inumStripesUsed;i++) { + printf("%ld %ld --",stripe_parms->stripeIOoffsets[i],stripe_parms->stripeIOLens[i]); + } + printf("\n"); +#endif + } // if ((stripe_parms->stripeSize>0) && (stripe_parms->segmentIter==0)) + + if (ca_data->onesided_always_rmw && ((stripeSize==0) || (stripe_parms->segmentIter==0))) { // read in the first buffer + ADIO_Offset_CA tmpCurrentRoundFDEnd = 0; + if ((fd_end[myAggRank] - currentRoundFDStart) < coll_bufsize) { + if (myAggRank == greatestFileDomainAggRank) { + if (fd_end[myAggRank] > lastFileOffset) + tmpCurrentRoundFDEnd = lastFileOffset; + else + tmpCurrentRoundFDEnd = fd_end[myAggRank]; + } + else + tmpCurrentRoundFDEnd = fd_end[myAggRank]; + } + else + tmpCurrentRoundFDEnd = currentRoundFDStart + coll_bufsize - (ADIO_Offset_CA)1; +#ifdef onesidedtrace + printf("Rank %d - ca_data->onesided_always_rmw - first buffer pre-read for file offsets %ld to %ld total is %d\n",myrank,currentRoundFDStart,tmpCurrentRoundFDEnd,(int)(tmpCurrentRoundFDEnd - currentRoundFDStart)+1); +#endif + if (stripeSize==0) { + MPI_File_read_at(ca_data->fh, currentRoundFDStart, write_buf, (int)(tmpCurrentRoundFDEnd - currentRoundFDStart)+1, + MPI_BYTE, error_code); + } + else { + /* pre-read the entire batch of stripes we will do before writing */ + int stripeIter = 0; + for (stripeIter=0;stripeIternumStripesUsed;stripeIter++) + MPI_File_read_at(ca_data->fh, stripe_parms->stripeIOoffsets[stripeIter], (char*)write_buf + ((ADIO_Offset_CA)stripeIter * (ADIO_Offset_CA)stripeSize), stripe_parms->stripeIOLens[stripeIter], MPI_BYTE, error_code); + } + } + + } // if iAmUsedAgg + + if (ca_data->onesided_always_rmw && ((stripeSize == 0) || (segmentIter == 0))) // wait until the first buffer is read + MPI_Barrier(ca_data->comm); + +#ifdef onesidedtrace + MPI_Barrier(ca_data->comm); + if(myrank==0) { printf("\n\n"); fflush(stdout); } + MPI_Barrier(ca_data->comm); + printf("Rank %d is waiting at barrier between main loops.\n", myrank); + printf("Rank %d -- numberOfRounds = %d, contig_access_count = %d, numTargetAggs = %d\n", myrank, numberOfRounds, contig_access_count, numTargetAggs); + fflush(stdout); + MPI_Barrier(ca_data->comm); + if(myrank==0) { printf("\n\n"); fflush(stdout); } + MPI_Barrier(ca_data->comm); +#endif + + /* This is the second main loop of the algorithm, actually nested loop of target aggs within rounds. There are 2 flavors of this. + * For onesided_write_aggmethod of 1 each nested iteration for the target + * agg does an mpi_put on a contiguous chunk using a primative datatype + * determined using the data structures from the first main loop. For + * onesided_write_aggmethod of 2 each nested iteration for the target agg + * builds up data to use in created a derived data type for 1 mpi_put that is done for the target agg for each round. + * To support lustre there will need to be an additional layer of nesting + * for the multiple file domains within target aggs. + */ + int roundIter; + + for (roundIter=0;roundIter 0) && (buf != NULL) && lenListOverZero) { + + int aggIter; + for (aggIter=0;aggIter targetAggsForMyDataFDEnd[aggIter]) + currentRoundFDEndForMyTargetAgg = targetAggsForMyDataFDEnd[aggIter]; + + ADIO_Offset_CA offsetStart = offset_list[offsetIter], offsetEnd = (offset_list[offsetIter]+len_list[offsetIter]-(ADIO_Offset_CA)1); + +#ifdef onesidedtrace + printf("Rank %d - roundIter %d target iter %d targetAggsForMyData is %d offset_list[%d] is %ld len_list[%d] is %ld targetAggsForMyDataFDStart is %ld targetAggsForMyDataFDEnd is %ld currentRoundFDStartForMyTargetAgg is %ld currentRoundFDEndForMyTargetAgg is %ld targetAggsForMyDataFirstOffLenIndex is %ld\n", + myrank,roundIter,aggIter,targetAggsForMyData[aggIter],offsetIter,offset_list[offsetIter],offsetIter,len_list[offsetIter], + targetAggsForMyDataFDStart[aggIter],targetAggsForMyDataFDEnd[aggIter], + currentRoundFDStartForMyTargetAgg,currentRoundFDEndForMyTargetAgg, targetAggsForMyDataFirstOffLenIndex[roundIter][aggIter]); +#endif + + /* Determine the amount of data and exact source buffer offsets to use. + */ + int bufferAmountToSend = 0; + + if ((offsetStart >= currentRoundFDStartForMyTargetAgg) && (offsetStart <= currentRoundFDEndForMyTargetAgg)) { + if (offsetEnd > currentRoundFDEndForMyTargetAgg) + bufferAmountToSend = (currentRoundFDEndForMyTargetAgg - offsetStart) +1; + else + bufferAmountToSend = (offsetEnd - offsetStart) +1; + } + else if ((offsetEnd >= currentRoundFDStartForMyTargetAgg) && (offsetEnd <= currentRoundFDEndForMyTargetAgg)) { + if (offsetEnd > currentRoundFDEndForMyTargetAgg) + bufferAmountToSend = (currentRoundFDEndForMyTargetAgg - currentRoundFDStartForMyTargetAgg) +1; + else + bufferAmountToSend = (offsetEnd - currentRoundFDStartForMyTargetAgg) +1; + if (offsetStart < currentRoundFDStartForMyTargetAgg) { + offsetStart = currentRoundFDStartForMyTargetAgg; + } + } + else if ((offsetStart <= currentRoundFDStartForMyTargetAgg) && (offsetEnd >= currentRoundFDEndForMyTargetAgg)) { + bufferAmountToSend = (currentRoundFDEndForMyTargetAgg - currentRoundFDStartForMyTargetAgg) +1; + offsetStart = currentRoundFDStartForMyTargetAgg; + } + + numBytesPutThisAggRound += bufferAmountToSend; +#ifdef onesidedtrace + printf("Rank %d - bufferAmountToSend is %d\n",myrank,bufferAmountToSend); +#endif + if (bufferAmountToSend > 0) { /* we have data to send this round */ + + if (ca_data->onesided_write_aggmethod == 2) { + /* Only allocate these arrays if we are using method 2 and only do it once for this round/target agg. + */ + if (!allocatedDerivedTypeArrays) { + targetAggBlockLengths = (int *)H5MM_malloc(maxNumContigOperations * sizeof(int)); + targetAggDisplacements = (MPI_Aint *)H5MM_malloc(maxNumContigOperations * sizeof(MPI_Aint)); + sourceBufferDisplacements = (MPI_Aint *)H5MM_malloc(maxNumContigOperations * sizeof(MPI_Aint)); + targetAggDataTypes = (MPI_Datatype *)H5MM_malloc(maxNumContigOperations * sizeof(MPI_Datatype)); + if (!bufTypeIsContig) { + int k; + for (k=targetAggsForMyDataFirstOffLenIndex[roundIter][aggIter];k<=targetAggsForMyDataLastOffLenIndex[roundIter][aggIter];k++) + amountOfDataWrittenThisRoundAgg += len_list[k]; + +#ifdef onesidedtrace + printf("Rank %d - derivedTypePackedSourceBuffer mallocing %ld\n",myrank,amountOfDataWrittenThisRoundAgg); +#endif + + if (amountOfDataWrittenThisRoundAgg > 0) + derivedTypePackedSourceBuffer = (char *)H5MM_malloc(amountOfDataWrittenThisRoundAgg * sizeof(char)); + else + derivedTypePackedSourceBuffer = NULL; + } + allocatedDerivedTypeArrays = 1; + } + } + + /* Determine the offset into the target window. + */ + MPI_Aint targetDisplacementToUseThisRound = (MPI_Aint) (offsetStart - currentRoundFDStartForMyTargetAgg) + ((MPI_Aint)(segmentIter)*(MPI_Aint)(stripeSize)); + + + /* For onesided_write_aggmethod of 1 do the mpi_put using the primitive MPI_BYTE type for each contiguous + * chunk in the target, of source data is non-contiguous then pack the data first. + */ + if (ca_data->onesided_write_aggmethod == 1) { + + MPI_Win_lock(MPI_LOCK_SHARED, targetAggsForMyData[aggIter], MPI_MODE_NOCHECK, write_buf_window); + + char *putSourceData; + if (bufTypeIsContig) { +#ifdef onesidedtrace + printf("Rank %d - ca_data->onesided_write_aggmethod == 1 currentFDSourceBufferState[%d].sourceBufferOffset is %ld bufferAmountToSend is %d targetAggsForMyData[aggIter] is %d targetDisplacementToUseThisRound is %d write_buf_window is %016lx\n",myrank,aggIter,currentFDSourceBufferState[aggIter].sourceBufferOffset,bufferAmountToSend,targetAggsForMyData[aggIter],targetDisplacementToUseThisRound,write_buf_window); + fflush(stdout); +#endif + MPI_Put(((char*)buf) + currentFDSourceBufferState[aggIter].sourceBufferOffset,bufferAmountToSend, MPI_BYTE,targetAggsForMyData[aggIter],targetDisplacementToUseThisRound, bufferAmountToSend,MPI_BYTE,write_buf_window); + currentFDSourceBufferState[aggIter].sourceBufferOffset += (ADIO_Offset_CA)bufferAmountToSend; + } + else { + putSourceData = (char *) H5MM_malloc(bufferAmountToSend*sizeof(char)); + H5FD_mpio_nc_buffer_advance(((char*)buf), memFlatBuf, bufferAmountToSend, 1, ¤tFDSourceBufferState[aggIter], putSourceData); + + MPI_Put(putSourceData,bufferAmountToSend, MPI_BYTE,targetAggsForMyData[aggIter],targetDisplacementToUseThisRound, bufferAmountToSend,MPI_BYTE,write_buf_window); + } + + MPI_Win_unlock(targetAggsForMyData[aggIter], write_buf_window); + + if (!bufTypeIsContig) + H5MM_free(putSourceData); + } + + + /* For aggmethod 2, populate the data structures for this round/agg for this offset iter + * to be used subsequently when building the derived type for 1 mpi_put for all the data for this + * round/agg. + */ + else if (ca_data->onesided_write_aggmethod == 2) { + + if (bufTypeIsContig) { + targetAggBlockLengths[targetAggContigAccessCount]= bufferAmountToSend; + targetAggDataTypes[targetAggContigAccessCount] = MPI_BYTE; + targetAggDisplacements[targetAggContigAccessCount] = targetDisplacementToUseThisRound; + sourceBufferDisplacements[targetAggContigAccessCount] = (MPI_Aint)currentFDSourceBufferState[aggIter].sourceBufferOffset; + currentFDSourceBufferState[aggIter].sourceBufferOffset += (ADIO_Offset_CA)bufferAmountToSend; + targetAggContigAccessCount++; + } + else { + H5FD_mpio_nc_buffer_advance(((char*)buf), memFlatBuf, bufferAmountToSend, 1, ¤tFDSourceBufferState[aggIter], &derivedTypePackedSourceBuffer[derivedTypePackedSourceBufferOffset]); + targetAggBlockLengths[targetAggContigAccessCount]= bufferAmountToSend; + targetAggDataTypes[targetAggContigAccessCount] = MPI_BYTE; + targetAggDisplacements[targetAggContigAccessCount] = targetDisplacementToUseThisRound; + sourceBufferDisplacements[targetAggContigAccessCount] = (MPI_Aint)derivedTypePackedSourceBufferOffset; + targetAggContigAccessCount++; + derivedTypePackedSourceBufferOffset += (ADIO_Offset_CA)bufferAmountToSend; + } + } + +#ifdef onesidedtrace + printf("Rank %d - roundIter %d bufferAmountToSend is %d offsetStart is %ld currentRoundFDStartForMyTargetAgg is %ld currentRoundFDEndForMyTargetAgg is %ld targetDisplacementToUseThisRound is %ld targetAggsForMyDataFDStart[aggIter] is %ld\n",myrank,roundIter, bufferAmountToSend, offsetStart,currentRoundFDStartForMyTargetAgg,currentRoundFDEndForMyTargetAgg,targetDisplacementToUseThisRound,targetAggsForMyDataFDStart[aggIter]); +#endif + + } // bufferAmountToSend > 0 + } // contig list + + /* For aggmethod 2, Now build the derived type using the data from this round/agg and do 1 single mpi_put. + */ + if (ca_data->onesided_write_aggmethod == 2) { + + MPI_Datatype sourceBufferDerivedDataType, targetBufferDerivedDataType; + MPI_Type_create_struct(targetAggContigAccessCount, targetAggBlockLengths, sourceBufferDisplacements, targetAggDataTypes, &sourceBufferDerivedDataType); + MPI_Type_commit(&sourceBufferDerivedDataType); + MPI_Type_create_struct(targetAggContigAccessCount, targetAggBlockLengths, targetAggDisplacements, targetAggDataTypes, &targetBufferDerivedDataType); + MPI_Type_commit(&targetBufferDerivedDataType); + +#ifdef onesidedtrace + printf("Rank %d - mpi_put of derived type to agg %d targetAggContigAccessCount is %d\n",myrank,targetAggsForMyData[aggIter],targetAggContigAccessCount); +#endif + + if (targetAggContigAccessCount > 0) { + +#ifdef onesidedtrace + printf("Rank %d - Calling 1st MPI_Win_lock\n",myrank); + fflush(stdout); +#endif + + MPI_Win_lock(MPI_LOCK_SHARED, targetAggsForMyData[aggIter], MPI_MODE_NOCHECK, write_buf_window); + + if (bufTypeIsContig) { +#ifdef onesidedtrace + printf("Rank %d - Calling MPI_Put with bufTypeIsContig==TRUE, aggIter %ld, targetAggsForMyData[aggIter] is %ld\n",myrank,aggIter,targetAggsForMyData[aggIter]); + fflush(stdout); +#endif + MPI_Put(((char*)buf),1, sourceBufferDerivedDataType,targetAggsForMyData[aggIter],0, 1,targetBufferDerivedDataType,write_buf_window); + } + else { + +#ifdef onesidedtrace + printf("Rank %d - Calling MPI_Put with bufTypeIsContig==FALSE, aggIter %ld, targetAggsForMyData[aggIter] is %ld\n",myrank,aggIter,targetAggsForMyData[aggIter]); + fflush(stdout); +#endif + MPI_Put(derivedTypePackedSourceBuffer,1, sourceBufferDerivedDataType,targetAggsForMyData[aggIter],0, 1,targetBufferDerivedDataType,write_buf_window); + } +#ifdef onesidedtrace + printf("Rank %d - Calling 1st MPI_Win_UNlock\n",myrank); + fflush(stdout); +#endif + MPI_Win_unlock(targetAggsForMyData[aggIter], write_buf_window); + //MPI_Win_fence(0, write_buf_window); + } + + if (allocatedDerivedTypeArrays) { + H5MM_free(targetAggBlockLengths); + H5MM_free(targetAggDisplacements); + H5MM_free(targetAggDataTypes); + H5MM_free(sourceBufferDisplacements); + if (!bufTypeIsContig) + if (derivedTypePackedSourceBuffer != NULL) + H5MM_free(derivedTypePackedSourceBuffer); + } + if (targetAggContigAccessCount > 0) { + MPI_Type_free(&sourceBufferDerivedDataType); + MPI_Type_free(&targetBufferDerivedDataType); + } + + } + + if (!ca_data->onesided_no_rmw) { + + MPI_Win io_buf_put_amounts_window_use = ca_data->io_buf_put_amounts_window; + if (ca_data->use_dup) { + io_buf_put_amounts_window_use = ca_data->io_buf_put_amounts_window_d; + } +#ifdef onesidedtrace + printf("Rank %d - Calling 2nd MPI_Win_lock\n",myrank); + fflush(stdout); +#endif + MPI_Win_lock(MPI_LOCK_SHARED, targetAggsForMyData[aggIter], MPI_MODE_NOCHECK, io_buf_put_amounts_window_use); +#ifdef onesidedtrace + printf("Rank %d - Calling MPI_Accumulate\n",myrank); + fflush(stdout); +#endif + MPI_Accumulate(&numBytesPutThisAggRound,1, MPI_INT,targetAggsForMyData[aggIter],0, 1, MPI_INT, MPI_SUM, io_buf_put_amounts_window_use); +#ifdef onesidedtrace + printf("Rank %d - Calling 2nd MPI_Win_UNlock\n",myrank); + fflush(stdout); +#endif + MPI_Win_unlock(targetAggsForMyData[aggIter], io_buf_put_amounts_window_use); + } + + } // baseoffset != -1 + } // target aggs + + if (stripeSize > 0) { + stripe_parms->lastDataTypeExtent = currentFDSourceBufferState[numTargetAggs-1].dataTypeExtent; + stripe_parms->lastFlatBufIndice = currentFDSourceBufferState[numTargetAggs-1].flatBufIndice; + stripe_parms->lastIndiceOffset = currentFDSourceBufferState[numTargetAggs-1].indiceOffset; +#ifdef onesidedtrace + printf("Rank %d - setting stripe_parms->lastDataTypeExtent %ld stripe_parms->lastFlatBufIndice %d stripe_parms->lastIndiceOffset %ld\n",myrank,stripe_parms->lastDataTypeExtent,stripe_parms->lastFlatBufIndice,stripe_parms->lastIndiceOffset); +#endif + } + + } /// contig_access_count > 0 + + /* Synchronize all procs before the file write */ + if ((stripeSize == 0) || (stripe_parms->flushCB)) { +#ifdef onesidedtrace + printf("Rank %d - first barrier roundIter %d\n",myrank,roundIter); +#endif + MPI_Barrier(ca_data->comm); + } + + if ((iAmUsedAgg || stripe_parms->iWasUsedStripingAgg) && ((stripeSize == 0) || (stripe_parms->flushCB))) { + stripe_parms->iWasUsedStripingAgg = 0; + /* Determine what offsets define the portion of the file domain the agg is writing this round. + */ + if (iAmUsedAgg) { + if ((fd_end[myAggRank] - currentRoundFDStart) < coll_bufsize) { + if (myAggRank == greatestFileDomainAggRank) { + if (fd_end[myAggRank] > lastFileOffset) + currentRoundFDEnd = lastFileOffset; + else + currentRoundFDEnd = fd_end[myAggRank]; + } + else + currentRoundFDEnd = fd_end[myAggRank]; + } + else + currentRoundFDEnd = currentRoundFDStart + coll_bufsize - (ADIO_Offset_CA)1; +#ifdef onesidedtrace + printf("current used agg about to writecontig - currentRoundFDStart is %ld currentRoundFDEnd is %ld within file domain %ld to %ld\n",currentRoundFDStart,currentRoundFDEnd,fd_start[myAggRank],fd_end[myAggRank]); +#endif + } +#ifdef onesidedtrace + else { + printf("former used agg about to writecontig\n"); + } +#endif + int doWriteContig = 1; + int tmp_put_amt = ca_data->io_buf_put_amounts; + if (ca_data->use_dup) tmp_put_amt = ca_data->io_buf_put_amounts_d; + + if (!ca_data->onesided_no_rmw) { + if (stripeSize == 0) { + if (tmp_put_amt != ((int)(currentRoundFDEnd - currentRoundFDStart)+1)) { + doWriteContig = 0; + hole_found = 1; +#ifdef onesidedtrace + printf("hole found --- ca_data->io_buf_put_amounts is %d currentRoundFDEnd is %ld currentRoundFDStart is %ld on roundIter %d\n",tmp_put_amt,currentRoundFDEnd,currentRoundFDStart,roundIter); +#endif + } + } + else { // file striping + if (tmp_put_amt != stripe_parms->amountOfStripedDataExpected) { + doWriteContig = 0; + hole_found = 1; +#ifdef onesidedtrace + printf("striping hole found --- ca_data->io_buf_put_amounts is %d stripe_parms->amountOfStripedDataExpected is %d on roundIter %d\n",tmp_put_amt,stripe_parms->amountOfStripedDataExpected,roundIter); +#endif + } + } + if (ca_data->use_dup) + ca_data->io_buf_put_amounts_d = 0; + else + ca_data->io_buf_put_amounts = 0; + } + + if (doWriteContig) { + if (stripeSize > 0) { +#ifdef onesidedtrace + printf("about to write out %d stripes\n",stripe_parms->numStripesUsed); +#endif + + int stripeIter = 0; + for (stripeIter=0;stripeIternumStripesUsed;stripeIter++) { + +#ifdef onesidedtrace + printf("writing write_buf offset %ld len %ld file offset %ld\n",((ADIO_Offset_CA)stripeIter * (ADIO_Offset_CA)(stripeSize)),stripe_parms->stripeIOLens[stripeIter],stripe_parms->stripeIOoffsets[stripeIter]); +#endif + if (ca_data->check_req) { + MPIO_Wait(&ca_data->io_Request, error_code); + ca_data->check_req = 0; + } + + MPI_File_iwrite_at(ca_data->fh, stripe_parms->stripeIOoffsets[stripeIter], (char*)(write_buf + ((ADIO_Offset_CA)stripeIter * (ADIO_Offset_CA)(stripeSize))), stripe_parms->stripeIOLens[stripeIter], MPI_BYTE, &ca_data->io_Request); + + if (ca_data->async_io_outer && 0) { + ca_data->check_req = 1; + } else { + MPIO_Wait(&ca_data->io_Request, error_code); + ca_data->check_req = 0; + } + + } + H5MM_free(stripe_parms->stripeIOLens); + H5MM_free(stripe_parms->stripeIOoffsets); + } + else { + + if (ca_data->check_req) { + MPIO_Wait(&ca_data->io_Request, error_code); + ca_data->check_req = 0; + } + + MPI_File_iwrite_at(ca_data->fh, currentRoundFDStart, write_buf, (int)(currentRoundFDEnd - currentRoundFDStart)+1, MPI_BYTE, &ca_data->io_Request); + + if (ca_data->async_io_outer) { + ca_data->check_req = 1; + } else { + MPIO_Wait(&ca_data->io_Request, error_code); + ca_data->check_req = 0; + } + + } + } + } // iAmUsedAgg + + if (iAmUsedAgg && stripeSize == 0) { + currentRoundFDStart += coll_bufsize; + + if (ca_data->onesided_always_rmw && (roundIter<(numberOfRounds-1))) { // read in the buffer for the next round unless this is the last round + ADIO_Offset_CA tmpCurrentRoundFDEnd = 0; + if ((fd_end[myAggRank] - currentRoundFDStart) < coll_bufsize) { + if (myAggRank == greatestFileDomainAggRank) { + if (fd_end[myAggRank] > lastFileOffset) + tmpCurrentRoundFDEnd = lastFileOffset; + else + tmpCurrentRoundFDEnd = fd_end[myAggRank]; + } + else + tmpCurrentRoundFDEnd = fd_end[myAggRank]; + } + else + tmpCurrentRoundFDEnd = currentRoundFDStart + coll_bufsize - (ADIO_Offset_CA)1; +#ifdef onesidedtrace + printf("Rank %d - ca_data->onesided_always_rmw - round %d buffer pre-read for file offsets %ld to %ld total is %d\n",myrank,roundIter, currentRoundFDStart,tmpCurrentRoundFDEnd,(int)(tmpCurrentRoundFDEnd - currentRoundFDStart)+1); +#endif + MPI_File_read_at(ca_data->fh, currentRoundFDStart, write_buf, (int)(tmpCurrentRoundFDEnd - currentRoundFDStart)+1, + MPI_BYTE, error_code); + } + } + + if (roundIter<(numberOfRounds-1)) { +#ifdef onesidedtrace + printf("second barrier roundIter %d --- waiting in loop this time\n",roundIter); +#endif + MPI_Barrier(ca_data->comm); + } + + } /* for-loop roundIter */ + +#ifdef onesidedtrace + printf("freeing datastructures\n"); +#endif + H5MM_free(targetAggsForMyData); + H5MM_free(targetAggsForMyDataFDStart); + H5MM_free(targetAggsForMyDataFDEnd); + + for (i=0;istripeSize; + int segmentIter = stripe_parms->segmentIter; + hsize_t bufTypeExtent = stripe_parms->bufTypeExtent; + + if ((stripeSize > 0) && stripe_parms->firstStripedIOCall) + stripe_parms->iWasUsedStripingAgg = 0; + +#ifdef onesidedtrace + if (buf == NULL) { + printf("H5FD_mpio_ccio_osagg_read - buf is NULL contig_access_count is %d\n",contig_access_count); + for (i=0;i 0) lenListOverZero = 1; + } + + *error_code = MPI_SUCCESS; /* initialize to success */ + + MPI_Status status; + + pthread_t io_thread; + void *thread_ret; + ThreadFuncData io_thread_args; + + int nprocs,myrank; + MPI_Comm_size(ca_data->comm, &nprocs); + MPI_Comm_rank(ca_data->comm, &myrank); + +#ifdef onesidedtrace + printf("Rank %d - H5FD_mpio_ccio_osagg_read started\n",myrank); +#endif + + if (ca_data->io_buf_window == MPI_WIN_NULL || ca_data->io_buf_put_amounts_window == MPI_WIN_NULL) + { + HDF5_ccio_win_setup(ca_data, nprocs); + } + + /* This flag denotes whether the source datatype is contiguous, which is referenced throughout the algorithm + * and defines how the source buffer offsets and data chunks are determined. If the value is 1 (true - contiguous data) + * things are profoundly simpler in that the source buffer offset for a given target offset simply linearly increases + * by the chunk sizes being written. If the value is 0 (non-contiguous) then these values are based on calculations + * from the flattened source datatype. + */ + int bufTypeIsContig; + if (flatBuf->count == 1) + bufTypeIsContig = 1; + else + bufTypeIsContig = 0; + + if (!bufTypeIsContig) { + /* For a non-contiguous source buffer set the extent. */ + if ((stripeSize == 0) || stripe_parms->firstStripedIOCall) { + bufTypeExtent = flatBuf->extent; + } + +#ifdef onesidedtrace + printf("Rank %d - memFlatBuf->count is %d bufTypeExtent is %ld\n",myrank,flatBuf->count, bufTypeExtent); + for (i=0;icount;i++) + printf("Rank %d - flatBuf->blocklens[%d] is %d flatBuf->indices[%d] is %ld\n",myrank,i,flatBuf->blocklens[i],i,flatBuf->indices[i]); +#endif + } + + int naggs = ca_data->cb_nodes; + + /* Track the state of the source buffer for feeding the target data blocks. + * For GPFS the number of file domains per agg is always 1 so we just need 1 agg + * dimension to track the data, in the case of lustre we will need 2 dimensions + * agg and file domain since aggs write to multiple file domains in the case of lustre. + * This structure will be modified as the data is written to reflect the current state + * of the offset. + */ + +#ifdef onesidedtrace + printf("Rank %d - sizeof(FDSourceBufferState_CA) is %d - make sure is 32 for 32-byte memalign optimal\n",myrank,sizeof(FDSourceBufferState_CA)); +#endif + + FDSourceBufferState_CA *currentFDSourceBufferState = (FDSourceBufferState_CA *) H5MM_malloc(naggs * sizeof(FDSourceBufferState_CA)); + for (i=0;icb_buffer_size); + + /* Check if the I/O is (inner) asynchronous */ + if (ca_data->async_io_inner == 1) { + /* split buffer in half for asynchronous I/O */ + coll_bufsize = (ADIO_Offset_CA)(ca_data->cb_buffer_size/2); + } + + /* This logic defines values that are used later to determine what offsets define the portion + * of the file domain the agg is reading this round. + */ + int greatestFileDomainAggRank = -1,smallestFileDomainAggRank = -1; + ADIO_Offset_CA greatestFileDomainOffset = 0; + ADIO_Offset_CA smallestFileDomainOffset = lastFileOffset; + for (j=0;j greatestFileDomainOffset) { + greatestFileDomainOffset = fd_end[j]; + greatestFileDomainAggRank = j; + } + if (fd_start[j] < smallestFileDomainOffset) { + smallestFileDomainOffset = fd_start[j]; + smallestFileDomainAggRank = j; + } + if (ca_data->ranklist[j] == myrank) { + myAggRank = j; + if (fd_end[j] > fd_start[j]) { + iAmUsedAgg = 1; + stripe_parms->iWasUsedStripingAgg = 1; + } + } + } + +#ifdef onesidedtrace + printf("Rank %d - contig_access_count is %d lastFileOffset is %ld firstFileOffset is %ld\n",myrank,contig_access_count,lastFileOffset,firstFileOffset); + for (j=0;j numberOfRounds) + numberOfRounds = currentNumberOfRounds; + } + + /* Data structures to track what data this compute needs to receive from whom. + * For lustre they will all need another dimension for the file domain. + */ + int *sourceAggsForMyData = (int *) H5MM_malloc(naggs * sizeof(int)); + ADIO_Offset_CA *sourceAggsForMyDataFDStart = (ADIO_Offset_CA *)H5MM_malloc(naggs * sizeof(ADIO_Offset_CA)); + ADIO_Offset_CA *sourceAggsForMyDataFDEnd = (ADIO_Offset_CA *)H5MM_malloc(naggs * sizeof(ADIO_Offset_CA)); + int numSourceAggs = 0; + + /* This data structure holds the beginning offset and len list index for the range to be read + * coresponding to the round and source agg. Initialize to -1 to denote being unset. + */ + int **sourceAggsForMyDataFirstOffLenIndex = (int **) H5MM_malloc(numberOfRounds * sizeof(int *)); + for (i = 0; i < numberOfRounds; i++) { + sourceAggsForMyDataFirstOffLenIndex[i] = (int *) H5MM_malloc(naggs * sizeof(int)); + for (j = 0; j < naggs; j++) + sourceAggsForMyDataFirstOffLenIndex[i][j] = -1; + } + + /* This data structure holds the ending offset and len list index for the range to be read + * coresponding to the round and source agg. + */ + int **sourceAggsForMyDataLastOffLenIndex = (int **) H5MM_malloc(numberOfRounds * sizeof(int *)); + for (i = 0; i < numberOfRounds; i++) + sourceAggsForMyDataLastOffLenIndex[i] = (int *) H5MM_malloc(naggs * sizeof(int)); + +#ifdef onesidedtrace + printf("Rank %d - NumberOfRounds is %d\n",myrank,numberOfRounds); + for (i=0;iranklist[%d] is %d fd_start is %ld fd_end is %ld\n",myrank,i,ca_data->ranklist[i],fd_start[i],fd_end[i]); + for (j=0;j 0) && !stripe_parms->firstStripedIOCall) { + currentDataTypeExtent = stripe_parms->lastDataTypeExtent; + currentFlatBufIndice = stripe_parms->lastFlatBufIndice; + currentIndiceOffset = stripe_parms->lastIndiceOffset; + } + + /* This denotes the coll_bufsize boundaries within the source buffer for reading for 1 round. + */ + ADIO_Offset_CA intraRoundCollBufsizeOffset = 0; + + /* This data structure tracks what source aggs need to be read to on what rounds. + */ + int *sourceAggsForMyDataCurrentRoundIter = (int *) H5MM_malloc(naggs * sizeof(int)); + for (i = 0; i < naggs; i++) + sourceAggsForMyDataCurrentRoundIter[i] = 0; + + + /* This is the first of the two main loops in this algorithm. + * The purpose of this loop is essentially to populate + * the data structures defined above for what read data blocks + * needs to go where (source agg and file domain) and when + * (round iter). For lustre essentially an additional layer of + * nesting will be required for the multiple file domains + * within the source agg. + */ + if ((contig_access_count > 0) && (buf != NULL) && lenListOverZero) { + int blockIter; + for (blockIter = 0; blockIter < contig_access_count; blockIter++) { + + /* Determine the starting source buffer offset for this block - for iter 0 skip it since that value is 0. + */ + if (blockIter > 0) { + if (bufTypeIsContig) { + currentRecvBufferOffset += len_list[blockIter - 1]; + } else { + /* Non-contiguous source datatype, count up the extents and indices to this point + * in the blocks. + */ + ADIO_Offset_CA sourceBlockTotal = 0; + int lastIndiceUsed = currentFlatBufIndice; + int numNonContigSourceChunks = 0; + + while (sourceBlockTotal < len_list[blockIter - 1]) { + numNonContigSourceChunks++; + sourceBlockTotal += (flatBuf->blocklens[currentFlatBufIndice] - currentIndiceOffset); + lastIndiceUsed = currentFlatBufIndice; + currentFlatBufIndice++; + if (currentFlatBufIndice == flatBuf->count) { + currentFlatBufIndice = 0; + currentDataTypeExtent++; + } + currentIndiceOffset = (ADIO_Offset_CA) 0; + } + if (sourceBlockTotal > len_list[blockIter - 1]) { + currentFlatBufIndice--; + if (currentFlatBufIndice < 0) { + currentDataTypeExtent--; + currentFlatBufIndice = flatBuf->count - 1; + } + currentIndiceOffset = len_list[blockIter - 1] - (sourceBlockTotal - flatBuf->blocklens[lastIndiceUsed]); + } else + currentIndiceOffset = (ADIO_Offset_CA) 0; + maxNumContigOperations += (numNonContigSourceChunks + 2); + if (numNonContigSourceChunks > maxNumNonContigSourceChunks) + maxNumNonContigSourceChunks = numNonContigSourceChunks; + +#ifdef onesidedtrace + printf("blockiter %d currentFlatBufIndice is now %d currentDataTypeExtent is now %ld currentIndiceOffset is now %ld maxNumContigOperations is now %d\n",blockIter,currentFlatBufIndice,currentDataTypeExtent,currentIndiceOffset,maxNumContigOperations); +#endif + } // !bufTypeIsContig + } // blockIter > 0 + + /* For the last iteration we need to include these maxNumContigOperations and maxNumNonContigSourceChunks + * for non-contig case even though we did not need to compute the next starting offset. + */ + if ((blockIter == (contig_access_count - 1)) && (!bufTypeIsContig)) { + ADIO_Offset_CA sourceBlockTotal = 0; + int tmpCurrentFlatBufIndice = currentFlatBufIndice; + int lastNumNonContigSourceChunks = 0; + while (sourceBlockTotal < len_list[blockIter]) { + lastNumNonContigSourceChunks++; + sourceBlockTotal += flatBuf->blocklens[tmpCurrentFlatBufIndice]; + tmpCurrentFlatBufIndice++; + if (tmpCurrentFlatBufIndice == flatBuf->count) { + tmpCurrentFlatBufIndice = 0; + } + } + maxNumContigOperations += (lastNumNonContigSourceChunks + 2); + if (lastNumNonContigSourceChunks > maxNumNonContigSourceChunks) + maxNumNonContigSourceChunks = lastNumNonContigSourceChunks; + } + + ADIO_Offset_CA blockStart = offset_list[blockIter]; + ADIO_Offset_CA blockEnd = offset_list[blockIter] + len_list[blockIter] - (ADIO_Offset_CA)1; + + /* Find the starting source agg for this block - normally it will be the current agg so guard the expensive + * while loop with a cheap if-check which for large numbers of small blocks will usually be false. + */ + if (!((blockStart >= fd_start[currentAggRankListIndex]) && (blockStart <= fd_end[currentAggRankListIndex]))) { + while (!((blockStart >= fd_start[currentAggRankListIndex]) && (blockStart <= fd_end[currentAggRankListIndex]))) + currentAggRankListIndex++; + }; + +#ifdef onesidedtrace + printf("Rank %d - currentAggRankListIndex is %d blockStart %ld blockEnd %ld fd_start[currentAggRankListIndex] %ld fd_end[currentAggRankListIndex] %ld\n",myrank,currentAggRankListIndex,blockStart,blockEnd,fd_start[currentAggRankListIndex],fd_end[currentAggRankListIndex]); +#endif + + /* Determine if this is a new source agg. + */ + if (blockIter > 0) { + if ((offset_list[blockIter - 1] + len_list[blockIter - 1] - (ADIO_Offset_CA) 1) < fd_start[currentAggRankListIndex]) { + numSourceAggs++; + } + } + + /* Determine which round to start reading. + */ + if ((blockStart - fd_start[currentAggRankListIndex]) >= coll_bufsize) { + ADIO_Offset_CA currentRoundBlockStart = fd_start[currentAggRankListIndex]; + int startingRound = 0; + while (blockStart > (currentRoundBlockStart + coll_bufsize - (ADIO_Offset_CA) 1)) { + currentRoundBlockStart += coll_bufsize; + startingRound++; + } + sourceAggsForMyDataCurrentRoundIter[numSourceAggs] = startingRound; + } + + /* Initialize the data structures if this is the first offset in the round/source agg. + */ + if (sourceAggsForMyDataFirstOffLenIndex[sourceAggsForMyDataCurrentRoundIter[numSourceAggs]][numSourceAggs] == -1) { + sourceAggsForMyData[numSourceAggs] = ca_data->ranklist[currentAggRankListIndex]; + sourceAggsForMyDataFDStart[numSourceAggs] = fd_start[currentAggRankListIndex]; + /* Round up file domain to the first actual offset used if this is the first file domain. + */ + if (currentAggRankListIndex == smallestFileDomainAggRank) { + if (sourceAggsForMyDataFDStart[numSourceAggs] < firstFileOffset) + sourceAggsForMyDataFDStart[numSourceAggs] = firstFileOffset; + } + sourceAggsForMyDataFDEnd[numSourceAggs] = fd_end[currentAggRankListIndex]; + /* Round down file domain to the last actual offset used if this is the last file domain. + */ + if (currentAggRankListIndex == greatestFileDomainAggRank) { + if (sourceAggsForMyDataFDEnd[numSourceAggs] > lastFileOffset) + sourceAggsForMyDataFDEnd[numSourceAggs] = lastFileOffset; + } + sourceAggsForMyDataFirstOffLenIndex[sourceAggsForMyDataCurrentRoundIter[numSourceAggs]][numSourceAggs] = blockIter; + + /* Set the source buffer state starting point for data access for this agg and file domain. + */ + if (bufTypeIsContig) { + if (currentFDSourceBufferState[numSourceAggs].sourceBufferOffset == -1) { + currentFDSourceBufferState[numSourceAggs].sourceBufferOffset = currentRecvBufferOffset; +#ifdef onesidedtrace + printf("Rank %d - For agg %d sourceBufferOffset initialized to %ld\n",myrank,currentAggRankListIndex,currentRecvBufferOffset); +#endif + } + } else { + if (currentFDSourceBufferState[numSourceAggs].indiceOffset == -1) { + currentFDSourceBufferState[numSourceAggs].indiceOffset = currentIndiceOffset; + currentFDSourceBufferState[numSourceAggs].bufTypeExtent = bufTypeExtent; + currentFDSourceBufferState[numSourceAggs].dataTypeExtent = currentDataTypeExtent; + currentFDSourceBufferState[numSourceAggs].flatBufIndice = currentFlatBufIndice; +#ifdef onesidedtrace + printf("Rank %d - For agg %d dataTypeExtent initialized to %d flatBufIndice to %d indiceOffset to %ld\n", myrank, numSourceAggs, currentDataTypeExtent, currentFlatBufIndice, currentIndiceOffset); +#endif + } + } + + intraRoundCollBufsizeOffset = fd_start[currentAggRankListIndex] + ((ADIO_Offset_CA) (sourceAggsForMyDataCurrentRoundIter[numSourceAggs] + 1) * coll_bufsize); + +#ifdef onesidedtrace + printf("Rank %d - init settings numSourceAggs %d offset_list[%d] with value %ld past fd border %ld with len %ld currentRecvBufferOffset set to %ld intraRoundCollBufsizeOffset set to %ld\n", myrank, numSourceAggs, blockIter, offset_list[blockIter], fd_start[currentAggRankListIndex], len_list[blockIter], currentRecvBufferOffset, intraRoundCollBufsizeOffset); +#endif + + } + + /* Replace the last offset block iter with this one. + */ + sourceAggsForMyDataLastOffLenIndex[sourceAggsForMyDataCurrentRoundIter[numSourceAggs]][numSourceAggs] = blockIter; + + /* If this blocks extends into the next file domain advance to the next source aggs and source buffer states. + */ + if (blockEnd > fd_end[currentAggRankListIndex]) { + + ADIO_Offset_CA amountToAdvanceSBOffsetForFD = 0; + int additionalFDCounter = 0; + + while (blockEnd > fd_end[currentAggRankListIndex]) { +#ifdef onesidedtrace + printf("Rank %d - block extends past current fd, blockEnd %ld >= fd_end[currentAggRankListIndex] %ld total block size is %ld blockStart was %ld\n", myrank, blockEnd, fd_end[currentAggRankListIndex], len_list[blockIter], blockStart); + printf("Rank %d - currentAggRankListIndex is now %d blockEnd %ld > fd_end[%d] %ld\n", myrank, currentAggRankListIndex, blockEnd, currentAggRankListIndex, fd_end[currentAggRankListIndex]); +#endif + ADIO_Offset_CA thisAggBlockEnd = fd_end[currentAggRankListIndex]; + if (thisAggBlockEnd >= intraRoundCollBufsizeOffset) { + while (thisAggBlockEnd >= intraRoundCollBufsizeOffset) { + sourceAggsForMyDataCurrentRoundIter[numSourceAggs]++; + intraRoundCollBufsizeOffset += coll_bufsize; + sourceAggsForMyDataFirstOffLenIndex[sourceAggsForMyDataCurrentRoundIter[numSourceAggs]][numSourceAggs] = blockIter; + sourceAggsForMyDataLastOffLenIndex[sourceAggsForMyDataCurrentRoundIter[numSourceAggs]][numSourceAggs] = blockIter; +#ifdef onesidedtrace + printf("Rank %d - sourceAggsForMyDataCurrentRoundI%d] is now %d intraRoundCollBufsizeOffset is now %ld\n", myrank, numSourceAggs, sourceAggsForMyDataCurrentRoundIter[numSourceAggs], intraRoundCollBufsizeOffset); +#endif + } // while (thisAggBlockEnd >= intraRoundCollBufsizeOffset) + } // if (thisAggBlockEnd >= intraRoundCollBufsizeOffset) + + int prevAggRankListIndex = currentAggRankListIndex; + currentAggRankListIndex++; + + /* Skip over unused aggs. + */ + if (fd_start[currentAggRankListIndex] > fd_end[currentAggRankListIndex]) { + while (fd_start[currentAggRankListIndex] > fd_end[currentAggRankListIndex]) + currentAggRankListIndex++; + } // (fd_start[currentAggRankListIndex] > fd_end[currentAggRankListIndex]) + + /* Start new source agg. + */ + if (blockEnd >= fd_start[currentAggRankListIndex]) { + numSourceAggs++; + sourceAggsForMyData[numSourceAggs] = ca_data->ranklist[currentAggRankListIndex]; + sourceAggsForMyDataFDStart[numSourceAggs] = fd_start[currentAggRankListIndex]; + /* Round up file domain to the first actual offset used if this is the first file domain. + */ + if (currentAggRankListIndex == smallestFileDomainAggRank) { + if (sourceAggsForMyDataFDStart[numSourceAggs] < firstFileOffset) + sourceAggsForMyDataFDStart[numSourceAggs] = firstFileOffset; + } + sourceAggsForMyDataFDEnd[numSourceAggs] = fd_end[currentAggRankListIndex]; + /* Round down file domain to the last actual offset used if this is the last file domain. + */ + if (currentAggRankListIndex == greatestFileDomainAggRank) { + if (sourceAggsForMyDataFDEnd[numSourceAggs] > lastFileOffset) + sourceAggsForMyDataFDEnd[numSourceAggs] = lastFileOffset; + } + sourceAggsForMyDataFirstOffLenIndex[sourceAggsForMyDataCurrentRoundIter[numSourceAggs]][numSourceAggs] = blockIter; + + /* For the first additonal file domain the source buffer offset + * will be incremented relative to the state of this first main + * loop but for subsequent full file domains the offset will be + * incremented by the size of the file domain. + */ + if (additionalFDCounter == 0) + amountToAdvanceSBOffsetForFD = (fd_end[prevAggRankListIndex] - blockStart) + (ADIO_Offset_CA) 1; + else + amountToAdvanceSBOffsetForFD = (fd_end[prevAggRankListIndex] - fd_start[prevAggRankListIndex]) + (ADIO_Offset_CA) 1; + + if (bufTypeIsContig) { + HDassert(numSourceAggs > 0); + if (currentFDSourceBufferState[numSourceAggs].sourceBufferOffset == -1) { + if (additionalFDCounter == 0) { // first file domain, still use the current data counter + currentFDSourceBufferState[numSourceAggs].sourceBufferOffset = currentRecvBufferOffset + amountToAdvanceSBOffsetForFD; + } else { // 2nd file domain, advance full file domain from last source buffer state + currentFDSourceBufferState[numSourceAggs].sourceBufferOffset = currentFDSourceBufferState[numSourceAggs - 1].sourceBufferOffset + amountToAdvanceSBOffsetForFD; + } +#ifdef onesidedtrace + printf("Rank %d - Crossed into new FD - for agg %d sourceBufferOffset initialized to %ld amountToAdvanceSBOffsetForFD is %ld\n", myrank, numSourceAggs, currentFDSourceBufferState[numSourceAggs].sourceBufferOffset, amountToAdvanceSBOffsetForFD); +#endif + } + } else if (currentFDSourceBufferState[numSourceAggs].indiceOffset == -1) { + + /* non-contiguos source buffer */ + HDassert(numSourceAggs > 0); + + /* Initialize the source buffer state appropriately and then + * advance it with the nonContigSourceDataBufferAdvance function. + */ + if (additionalFDCounter == 0) { + // first file domain, still use the current data counter + currentFDSourceBufferState[numSourceAggs].indiceOffset = currentIndiceOffset; + currentFDSourceBufferState[numSourceAggs].bufTypeExtent = bufTypeExtent; + currentFDSourceBufferState[numSourceAggs].dataTypeExtent = currentDataTypeExtent; + currentFDSourceBufferState[numSourceAggs].flatBufIndice = currentFlatBufIndice; + } else { + // 2nd file domain, advance full file domain from last source buffer state + currentFDSourceBufferState[numSourceAggs].indiceOffset = + currentFDSourceBufferState[numSourceAggs - 1].indiceOffset; + currentFDSourceBufferState[numSourceAggs].bufTypeExtent = + currentFDSourceBufferState[numSourceAggs - 1].bufTypeExtent; + currentFDSourceBufferState[numSourceAggs].dataTypeExtent = currentFDSourceBufferState[numSourceAggs - 1].dataTypeExtent; + currentFDSourceBufferState[numSourceAggs].flatBufIndice = + currentFDSourceBufferState[numSourceAggs - 1].flatBufIndice; + } + H5FD_mpio_nc_buffer_advance(((char *) buf), flatBuf, (int) amountToAdvanceSBOffsetForFD, 0, ¤tFDSourceBufferState[numSourceAggs], NULL); + +#ifdef onesidedtrace + printf("Rank %d - Crossed into new FD - for agg %d dataTypeExtent initialized to %d flatBufIndice to %d indiceOffset to %ld amountToAdvanceSBOffsetForFD is %d\n", myrank, numSourceAggs, currentFDSourceBufferState[numSourceAggs].dataTypeExtent, currentFDSourceBufferState[numSourceAggs].flatBufIndice, currentFDSourceBufferState[numSourceAggs].indiceOffset, amountToAdvanceSBOffsetForFD); +#endif + } + additionalFDCounter++; + +#ifdef onesidedtrace + printf("Rank %d - block extended beyond fd init settings numSourceAggs %d offset_list[%d] with value %ld past fd border %ld with len %ld\n", myrank, numSourceAggs, blockIter, offset_list[blockIter], fd_start[currentAggRankListIndex], len_list[blockIter]); +#endif + intraRoundCollBufsizeOffset = fd_start[currentAggRankListIndex] + coll_bufsize; + sourceAggsForMyDataLastOffLenIndex[sourceAggsForMyDataCurrentRoundIter[numSourceAggs]][numSourceAggs] = blockIter; + + } // if (blockEnd >= fd_start[currentAggRankListIndex]) + } // while (blockEnd > fd_end[currentAggRankListIndex]) + } // if (blockEnd > fd_end[currentAggRankListIndex]) + + /* If we are still in the same file domain / source agg but have gone past the coll_bufsize and need + * to advance to the next round handle this situation. + */ + if (blockEnd >= intraRoundCollBufsizeOffset) { + ADIO_Offset_CA currentBlockEnd = blockEnd; + while (currentBlockEnd >= intraRoundCollBufsizeOffset) { + sourceAggsForMyDataCurrentRoundIter[numSourceAggs]++; + intraRoundCollBufsizeOffset += coll_bufsize; + sourceAggsForMyDataFirstOffLenIndex[sourceAggsForMyDataCurrentRoundIter[numSourceAggs]][numSourceAggs] = blockIter; + sourceAggsForMyDataLastOffLenIndex[sourceAggsForMyDataCurrentRoundIter[numSourceAggs]][numSourceAggs] = blockIter; +#ifdef onesidedtrace + printf("Rank %d - block less than fd currentBlockEnd is now %ld intraRoundCollBufsizeOffset is now %ld sourceAggsForMyDataCurrentRoundIter[%d] is now %d\n", myrank, currentBlockEnd, intraRoundCollBufsizeOffset, numSourceAggs, sourceAggsForMyDataCurrentRoundIter[numSourceAggs]); +#endif + } // while (currentBlockEnd >= intraRoundCollBufsizeOffset) + } // if (blockEnd >= intraRoundCollBufsizeOffset) + + /* Need to advance numSourceAggs if this is the last source offset to + * include this one. + */ + if (blockIter == (contig_access_count - 1)) + numSourceAggs++; + } + +#ifdef onesidedtrace + printf("Rank %d - numSourceAggs is %d\n", myrank, numSourceAggs); + /*for (i = 0; i < numSourceAggs; i++) { + for (j = 0; j <= sourceAggsForMyDataCurrentRoundIter[i]; j++) + printf("Rank %d - sourceAggsForMyData[%d] is %d sourceAggsForMyDataFDStart[%d] is %ld sourceAggsForMyDataFDEnd is %ld sourceAggsForMyDataFirstOffLenIndex is %d with value %ld sourceAggsForMyDataLastOffLenIndex is %d with value %ld\n", myrank, i, sourceAggsForMyData[i], i, sourceAggsForMyDataFDStart[i], sourceAggsForMyDataFDEnd[i], sourceAggsForMyDataFirstOffLenIndex[j][i], offset_list[sourceAggsForMyDataFirstOffLenIndex[j][i]], sourceAggsForMyDataLastOffLenIndex[j][i], offset_list[sourceAggsForMyDataLastOffLenIndex[j][i]]); + }*/ +#endif + + } // if ((contig_access_count > 0) && (buf != NULL) && lenListOverZero) + + H5MM_free(sourceAggsForMyDataCurrentRoundIter); + + int currentReadBuf = 0; + int useIOBuffer = 0; + + /* Check if the I/O is asynchronous */ + if ((ca_data->async_io_inner == 1) && (numberOfRounds > 1)) { + if (ca_data->pthread_io == 1) { + useIOBuffer = 1; + io_thread = pthread_self(); + } else { + ca_data->async_io_inner = 0; + } + } + + /* use the two-phase buffer allocated in the file_open - no app should ever + * be both reading and reading at the same time */ + char *read_buf0 = ca_data->io_buf; + char *read_buf1 = ca_data->io_buf + coll_bufsize; + + /* Async I/O - Adjust if this is the "duplicate" buffer */ + if (ca_data->use_dup) { + read_buf0 = ca_data->io_buf_d; + read_buf1 = ca_data->io_buf_d + coll_bufsize; + } + + /* use the two-phase buffer allocated in the file_open - no app should ever + * be both reading and reading at the same time */ + char *read_buf = read_buf0; + MPI_Win read_buf_window = ca_data->io_buf_window; + + /* Async I/O - Adjust if this is the "duplicate" buffer */ + if (ca_data->use_dup) { + read_buf_window = ca_data->io_buf_window_d; + } + + ADIO_Offset_CA currentRoundFDStart = 0, nextRoundFDStart = 0; + ADIO_Offset_CA currentRoundFDEnd = 0, nextRoundFDEnd = 0; + + if (iAmUsedAgg) { + currentRoundFDStart = fd_start[myAggRank]; + nextRoundFDStart = fd_start[myAggRank]; + if (myAggRank == smallestFileDomainAggRank) { + if (currentRoundFDStart < firstFileOffset) + currentRoundFDStart = firstFileOffset; + if (nextRoundFDStart < firstFileOffset) + nextRoundFDStart = firstFileOffset; + } else if (myAggRank == greatestFileDomainAggRank) { + if (currentRoundFDEnd > lastFileOffset) + currentRoundFDEnd = lastFileOffset; + if (nextRoundFDEnd > lastFileOffset) + nextRoundFDEnd = lastFileOffset; + } +#ifdef onesidedtrace + printf("Rank %d - iAmUsedAgg - currentRoundFDStart initialized to %ld currentRoundFDEnd to %ld\n", myrank, currentRoundFDStart, currentRoundFDEnd); +#endif + } // if iAmUsedAgg + +#ifdef onesidedtrace + MPI_Barrier(ca_data->comm); + if(myrank==0) { printf("\n\n"); fflush(stdout); } + MPI_Barrier(ca_data->comm); + printf("Rank %d is waiting at barrier between main loops.\n", myrank); + printf("Rank %d -- numberOfRounds = %d, contig_access_count = %d, numSourceAggs = %d\n", myrank, numberOfRounds, contig_access_count, numSourceAggs); + fflush(stdout); + MPI_Barrier(ca_data->comm); + if(myrank==0) { printf("\n\n"); fflush(stdout); } + MPI_Barrier(ca_data->comm); +#endif + + /* This is the second main loop of the algorithm, actually nested loop of + * aggs within rounds. There are 2 flavors of this. + * For onesided_read_aggmethod of 1 each nested iteration for the source agg + * does an mpi_get on a contiguous chunk using a primative datatype + * determined using the data structures from the first main loop. + * For onesided_read_aggmethod of 2 each nested iteration for the source agg + * builds up data to use in created a derived data type for 1 mpi_get that + * is done for the target agg for each round. + * To support lustre there will need to be an additional layer of nesting + * for the multiple file domains within target aggs. + */ + int roundIter; + for (roundIter = 0; roundIter < numberOfRounds; roundIter++) { + + if (iAmUsedAgg || stripe_parms->iWasUsedStripingAgg) { + stripe_parms->iWasUsedStripingAgg = 0; + +#ifdef onesidedtrace + printf("Rank %d - roundIter %ld of %ld - currentRoundFDEnd = %ld \n", myrank, roundIter, numberOfRounds, currentRoundFDEnd); +#endif + + /* determine what offsets define the portion of the file domain the agg is reading this round */ + if (iAmUsedAgg) { + + currentRoundFDStart = nextRoundFDStart; + + if (!useIOBuffer || (roundIter == 0)) { + + ADIO_Offset_CA amountDataToReadThisRound; + if ((fd_end[myAggRank] - currentRoundFDStart) < coll_bufsize) { + currentRoundFDEnd = fd_end[myAggRank]; + amountDataToReadThisRound = ((currentRoundFDEnd - currentRoundFDStart) + 1); + } else { + currentRoundFDEnd = currentRoundFDStart + coll_bufsize - (ADIO_Offset_CA) 1; + amountDataToReadThisRound = coll_bufsize; + } + +#ifdef onesidedtrace + printf("Rank %d - amountDataToReadThisRound=%ld - myAggRank=%ld - fd_end[myAggRank]=%ld - currentRoundFDStart=%ld - currentRoundFDEnd=%ld - coll_bufsize=%ld\n", myrank, amountDataToReadThisRound, myAggRank, fd_end[myAggRank], currentRoundFDStart, currentRoundFDEnd, coll_bufsize); +#endif + + /* + * Don't actually do the read if it was already done + * (asynchronously) outside this function call... + */ + if (do_file_read && amountDataToReadThisRound>0) { +#ifdef onesidedtrace + printf("Rank %d - calling MPI_File_read_at\n", myrank); +#endif + + if (ca_data->check_req) { + MPIO_Wait(&ca_data->io_Request, error_code); + ca_data->check_req = 0; + } + + /* read currentRoundFDEnd bytes */ + MPI_File_read_at(ca_data->fh, currentRoundFDStart, read_buf, amountDataToReadThisRound, MPI_BYTE, &status); + +#ifdef onesidedtrace + printf("Rank %d - Finishing MPI_File_read_at (offset=%d,size=%d)\n", myrank, currentRoundFDStart, amountDataToReadThisRound); + fflush(stdout); +#endif + } /* if (do_file_read) */ + + currentReadBuf = 1; + + } /* (!useIOBuffer || (roundIter == 0)) */ + if (useIOBuffer) { + + /* use the thread reader for the next round */ + /* switch back and forth between the read buffers so that the data aggregation code is diseminating 1 buffer while the thread is reading into the other */ + if (roundIter > 0) currentRoundFDEnd = nextRoundFDEnd; // Does this do anything? + + if (roundIter < (numberOfRounds - 1)) { + +#ifdef onesidedtrace + printf("Rank %d - Calc amountDataToReadNextRound...\n", myrank); + fflush(stdout); +#endif + nextRoundFDStart += coll_bufsize; + ADIO_Offset_CA amountDataToReadNextRound; + if ((fd_end[myAggRank] - nextRoundFDStart) < coll_bufsize) { + nextRoundFDEnd = fd_end[myAggRank]; + amountDataToReadNextRound = ((nextRoundFDEnd - nextRoundFDStart) + 1); + } else { + nextRoundFDEnd = nextRoundFDStart + coll_bufsize - (ADIO_Offset_CA) 1; + amountDataToReadNextRound = coll_bufsize; + } +#ifdef onesidedtrace + printf("Rank %d - nextRoundFDEnd = %ld, amountDataToReadNextRound = %ld.\n", myrank, nextRoundFDEnd, amountDataToReadNextRound); + fflush(stdout); + printf("Rank %d - myAggRank=%ld - fd_end[myAggRank]=%ld - nextRoundFDStart=%ld - nextRoundFDEnd=%ld - coll_bufsize=%ld\n", myrank, myAggRank, fd_end[myAggRank], nextRoundFDStart, nextRoundFDEnd, coll_bufsize); +#endif + if ( !pthread_equal(io_thread, pthread_self()) ) { + +#ifdef onesidedtrace + printf("Rank %d - Need pthread join.\n", myrank); + fflush(stdout); +#endif + pthread_join(io_thread, &thread_ret); + + int error_code_thread = *(int *) thread_ret; + if (error_code_thread != MPI_SUCCESS) { + printf("Rank %d - pthread_join FAILED!, error_code_thread = %d\n", myrank, error_code_thread); + fflush(stdout); + return; + } + io_thread = pthread_self(); + } + + /* do a little pointer shuffling: background I/O works from one + * buffer while two-phase machinery fills up another */ + if (currentReadBuf == 0) { + read_buf = read_buf1; + currentReadBuf = 1; + io_thread_args.buf = read_buf0; + } else { + read_buf = read_buf0; + currentReadBuf = 0; + io_thread_args.buf = read_buf1; + } + io_thread_args.fh = ca_data->fh; + io_thread_args.myrank = myrank; + io_thread_args.io_kind = READ_CA; + io_thread_args.size = amountDataToReadNextRound; + io_thread_args.offset = nextRoundFDStart; + io_thread_args.error_code = *error_code; + + if (amountDataToReadNextRound > 0) { +#ifdef onesidedtrace + printf("Rank %d - calling pthread_create (size=%ld,offset=%ld)\n", myrank, io_thread_args.size, io_thread_args.offset); + printf("Rank %d - (size=%ld,amountDataToReadNextRound=%ld)\n", myrank, io_thread_args.size, amountDataToReadNextRound); + fflush(stdout); +#endif + if ((pthread_create(&io_thread, NULL, IO_Thread_Func, &(io_thread_args))) != 0) + io_thread = pthread_self(); +#ifdef onesidedtrace + printf("Rank %d - pthread_create DONE.\n", myrank); +#endif + } + } else { /* last round */ + + if (!pthread_equal(io_thread, pthread_self())) { + + pthread_join(io_thread, &thread_ret); + int error_code_thread = *(int *) thread_ret; + if (error_code_thread != MPI_SUCCESS) { + printf("Rank %d - Last pthread_join FAILED!, error_code_thread = %d\n", myrank, error_code_thread); + fflush(stdout); + return; + } + io_thread = pthread_self(); + + } + if (currentReadBuf == 0) { + read_buf = read_buf1; + } else { + read_buf = read_buf0; + } + + } + } /* useIOBuffer */ + + } /* IAmUsedAgg */ + else if (useIOBuffer) { + if (roundIter < (numberOfRounds - 1)) { + if (currentReadBuf == 0) { + currentReadBuf = 1; + read_buf = read_buf1; + } else { + currentReadBuf = 0; + read_buf = read_buf0; + } + } else { + if (currentReadBuf == 0) { + read_buf = read_buf1; + } else { + read_buf = read_buf0; + } + } + } + + } // (iAmUsedAgg || stripe_parms->iWasUsedStripingAgg) + +#ifdef onesidedtrace + printf("Rank %d - Hitting MPI_Barrier.\n", myrank); +#endif + + /* wait until the read buffers are full before we start pulling from the source procs */ + MPI_Barrier(ca_data->comm); + + if ((contig_access_count > 0) && (buf != NULL) && lenListOverZero) { + + int aggIter; + for (aggIter = 0; aggIter < numSourceAggs; aggIter++) { + + /* If we have data for the round/agg process it. + */ + if (sourceAggsForMyDataFirstOffLenIndex[roundIter][aggIter] != -1) { + + ADIO_Offset_CA currentRoundFDStartForMySourceAgg = (ADIO_Offset_CA) ((ADIO_Offset_CA) sourceAggsForMyDataFDStart[aggIter] + (ADIO_Offset_CA) ((ADIO_Offset_CA) roundIter * coll_bufsize)); + ADIO_Offset_CA currentRoundFDEndForMySourceAgg = (ADIO_Offset_CA) ((ADIO_Offset_CA) sourceAggsForMyDataFDStart[aggIter] + (ADIO_Offset_CA) ((ADIO_Offset_CA) (roundIter + 1) * coll_bufsize) - (ADIO_Offset_CA) 1); + + int sourceAggContigAccessCount = 0; + + /* These data structures are used for the derived datatype mpi_get + * in the onesided_read_aggmethod of 2 case. + */ + int *sourceAggBlockLengths = NULL; + MPI_Aint *sourceAggDisplacements = NULL, *recvBufferDisplacements = NULL; + MPI_Datatype *sourceAggDataTypes = NULL; + char *derivedTypePackedSourceBuffer = NULL; + int derivedTypePackedSourceBufferOffset = 0; + int allocatedDerivedTypeArrays = 0; + ADIO_Offset_CA amountOfDataReadThisRoundAgg = 0; + + /* Process the range of offsets for this source agg. + */ + int offsetIter; + int startingOffLenIndex = sourceAggsForMyDataFirstOffLenIndex[roundIter][aggIter]; + int endingOffLenIndex = sourceAggsForMyDataLastOffLenIndex[roundIter][aggIter]; + for (offsetIter = startingOffLenIndex; offsetIter <= endingOffLenIndex; offsetIter++) { + + if (currentRoundFDEndForMySourceAgg > sourceAggsForMyDataFDEnd[aggIter]) + currentRoundFDEndForMySourceAgg = sourceAggsForMyDataFDEnd[aggIter]; + + ADIO_Offset_CA offsetStart = offset_list[offsetIter], offsetEnd = (offset_list[offsetIter] + len_list[offsetIter] - (ADIO_Offset_CA) 1); + + /* Determine the amount of data and exact source buffer offsets to use. + */ + int bufferAmountToRecv = 0; + + if ((offsetStart >= currentRoundFDStartForMySourceAgg) && (offsetStart <= currentRoundFDEndForMySourceAgg)) { + if (offsetEnd > currentRoundFDEndForMySourceAgg) + bufferAmountToRecv = (currentRoundFDEndForMySourceAgg - offsetStart) + 1; + else + bufferAmountToRecv = (offsetEnd - offsetStart) + 1; + } else if ((offsetEnd >= currentRoundFDStartForMySourceAgg) && (offsetEnd <= currentRoundFDEndForMySourceAgg)) { + if (offsetEnd > currentRoundFDEndForMySourceAgg) + bufferAmountToRecv = (currentRoundFDEndForMySourceAgg - currentRoundFDStartForMySourceAgg) + 1; + else + bufferAmountToRecv = (offsetEnd - currentRoundFDStartForMySourceAgg) + 1; + if (offsetStart < currentRoundFDStartForMySourceAgg) { + offsetStart = currentRoundFDStartForMySourceAgg; + } + } else if ((offsetStart <= currentRoundFDStartForMySourceAgg) && (offsetEnd >= currentRoundFDEndForMySourceAgg)) { + bufferAmountToRecv = (currentRoundFDEndForMySourceAgg - currentRoundFDStartForMySourceAgg) + 1; + offsetStart = currentRoundFDStartForMySourceAgg; + } + + if (bufferAmountToRecv > 0) { /* we have data to recv this round */ + if (ca_data->onesided_read_aggmethod == 2) { + /* Only allocate these arrays if we are using method 2 and only do it once for this round/source agg. + */ + if (!allocatedDerivedTypeArrays) { + sourceAggBlockLengths = (int *) H5MM_malloc(maxNumContigOperations * sizeof(int)); + sourceAggDisplacements = (MPI_Aint *) H5MM_malloc(maxNumContigOperations * sizeof(MPI_Aint)); + recvBufferDisplacements = (MPI_Aint *) H5MM_malloc(maxNumContigOperations * sizeof(MPI_Aint)); + sourceAggDataTypes = (MPI_Datatype *) H5MM_malloc(maxNumContigOperations * sizeof(MPI_Datatype)); + if (!bufTypeIsContig) { + int k; + for (k = sourceAggsForMyDataFirstOffLenIndex[roundIter][aggIter]; k <= sourceAggsForMyDataLastOffLenIndex[roundIter][aggIter]; k++) + amountOfDataReadThisRoundAgg += len_list[k]; + +#ifdef onesidedtrace + printf("Rank %d - derivedTypePackedSourceBuffer mallocing %ld\n", myrank,amountOfDataReadThisRoundAgg); +#endif + if (amountOfDataReadThisRoundAgg > 0) + derivedTypePackedSourceBuffer = (char *) H5MM_malloc(amountOfDataReadThisRoundAgg * sizeof(char)); + else + derivedTypePackedSourceBuffer = NULL; + } + allocatedDerivedTypeArrays = 1; + } + } + + /* Determine the offset into the source window. + */ + ADIO_Offset_CA sourceDisplacementToUseThisRound = (ADIO_Offset_CA) (offsetStart - currentRoundFDStartForMySourceAgg); + + /* If using the thread reader select the appropriate side of the split window. + */ + if (useIOBuffer && (read_buf == read_buf1)) { + sourceDisplacementToUseThisRound += (ADIO_Offset_CA) coll_bufsize; + } + + + /* For onesided_read_aggmethod of 1 do the mpi_get using the primitive MPI_BYTE type from each + * contiguous chunk from the target, if the source is non-contiguous then unpack the data after + * the MPI_Win_unlock is done to make sure the data has arrived first. + */ + if (ca_data->onesided_read_aggmethod == 1) { + + MPI_Win_lock(MPI_LOCK_SHARED, sourceAggsForMyData[aggIter], 0, read_buf_window); + + char *getSourceData = NULL; + if (bufTypeIsContig) { + + MPI_Get(((char *) buf) + currentFDSourceBufferState[aggIter].sourceBufferOffset, bufferAmountToRecv, MPI_BYTE, sourceAggsForMyData[aggIter], sourceDisplacementToUseThisRound, bufferAmountToRecv, MPI_BYTE, read_buf_window); + currentFDSourceBufferState[aggIter].sourceBufferOffset += (ADIO_Offset_CA) bufferAmountToRecv; + + } + else { + + getSourceData = (char *) H5MM_malloc(bufferAmountToRecv * sizeof(char)); + MPI_Get(getSourceData, bufferAmountToRecv, MPI_BYTE, sourceAggsForMyData[aggIter], sourceDisplacementToUseThisRound, bufferAmountToRecv, MPI_BYTE, read_buf_window); + + } + + MPI_Win_unlock(sourceAggsForMyData[aggIter], read_buf_window); + + if (!bufTypeIsContig) { + H5FD_mpio_nc_buffer_advance(((char *) buf), flatBuf, bufferAmountToRecv, 0, ¤tFDSourceBufferState[aggIter], getSourceData); + H5MM_free(getSourceData); + } + } + + /* For onesided_read_aggmethod of 2 populate the data structures for this round/agg for this offset iter + * to be used subsequently when building the derived type for 1 mpi_get for all the data for this + * round/agg. + */ + else if (ca_data->onesided_read_aggmethod == 2) { + + if (bufTypeIsContig) { + sourceAggBlockLengths[sourceAggContigAccessCount] = bufferAmountToRecv; + sourceAggDataTypes[sourceAggContigAccessCount] = MPI_BYTE; + sourceAggDisplacements[sourceAggContigAccessCount] = sourceDisplacementToUseThisRound; + recvBufferDisplacements[sourceAggContigAccessCount] = (MPI_Aint) currentFDSourceBufferState[aggIter].sourceBufferOffset; + currentFDSourceBufferState[aggIter].sourceBufferOffset += (ADIO_Offset_CA) bufferAmountToRecv; + sourceAggContigAccessCount++; + } + else { + sourceAggBlockLengths[sourceAggContigAccessCount] = bufferAmountToRecv; + sourceAggDataTypes[sourceAggContigAccessCount] = MPI_BYTE; + sourceAggDisplacements[sourceAggContigAccessCount] = sourceDisplacementToUseThisRound; + recvBufferDisplacements[sourceAggContigAccessCount] = (MPI_Aint) derivedTypePackedSourceBufferOffset; + derivedTypePackedSourceBufferOffset += (ADIO_Offset_CA) bufferAmountToRecv; + sourceAggContigAccessCount++; + } + } + } // bufferAmountToRecv > 0 + } // contig list + + /* For onesided_read_aggmethod of 2 now build the derived type using + * the data from this round/agg and do 1 single mpi_get. + */ + if (ca_data->onesided_read_aggmethod == 2) { + MPI_Datatype recvBufferDerivedDataType, sourceBufferDerivedDataType; + + MPI_Type_create_struct(sourceAggContigAccessCount, sourceAggBlockLengths, recvBufferDisplacements, sourceAggDataTypes, &recvBufferDerivedDataType); + MPI_Type_commit(&recvBufferDerivedDataType); + MPI_Type_create_struct(sourceAggContigAccessCount, sourceAggBlockLengths, sourceAggDisplacements, sourceAggDataTypes, &sourceBufferDerivedDataType); + MPI_Type_commit(&sourceBufferDerivedDataType); + + if (sourceAggContigAccessCount > 0) { + + MPI_Win_lock(MPI_LOCK_SHARED, sourceAggsForMyData[aggIter], 0, read_buf_window); + + if (bufTypeIsContig) { + + MPI_Get(((char *) buf), 1, recvBufferDerivedDataType, sourceAggsForMyData[aggIter], 0, 1, sourceBufferDerivedDataType, read_buf_window); + + } else { + + MPI_Get(derivedTypePackedSourceBuffer, 1, recvBufferDerivedDataType, sourceAggsForMyData[aggIter], 0, 1, sourceBufferDerivedDataType, read_buf_window); + + } + + MPI_Win_unlock(sourceAggsForMyData[aggIter], read_buf_window); + + if (!bufTypeIsContig) { + + H5FD_mpio_nc_buffer_advance(((char *) buf), flatBuf, derivedTypePackedSourceBufferOffset, 0, ¤tFDSourceBufferState[aggIter], derivedTypePackedSourceBuffer); + + } + } + + if (allocatedDerivedTypeArrays) { + H5MM_free(sourceAggBlockLengths); + H5MM_free(sourceAggDisplacements); + H5MM_free(sourceAggDataTypes); + H5MM_free(recvBufferDisplacements); + if (!bufTypeIsContig) { + if (derivedTypePackedSourceBuffer != NULL) + H5MM_free(derivedTypePackedSourceBuffer); + } + } + if (sourceAggContigAccessCount > 0) { + MPI_Type_free(&recvBufferDerivedDataType); + MPI_Type_free(&sourceBufferDerivedDataType); + } + } + } // baseoffset != -1 + } // source aggs + + if (stripeSize > 0) { + stripe_parms->lastDataTypeExtent = currentFDSourceBufferState[numSourceAggs-1].dataTypeExtent; + stripe_parms->lastFlatBufIndice = currentFDSourceBufferState[numSourceAggs-1].flatBufIndice; + stripe_parms->lastIndiceOffset = currentFDSourceBufferState[numSourceAggs-1].indiceOffset; + } + + } /* contig_access_count > 0 */ + + /* the source procs recv the requested data to the aggs */ + + /* Synchronize all procs */ + MPI_Barrier(ca_data->comm); + + nextRoundFDStart = currentRoundFDStart + coll_bufsize; + + } /* for-loop roundIter */ + + if (useIOBuffer) { /* thread readr cleanup */ + if (!pthread_equal(io_thread, pthread_self())) { + pthread_join(io_thread, &thread_ret); + *error_code = *(int *) thread_ret; + } + } + + H5MM_free(sourceAggsForMyData); + H5MM_free(sourceAggsForMyDataFDStart); + H5MM_free(sourceAggsForMyDataFDEnd); + + for (i = 0; i < numberOfRounds; i++) { + H5MM_free(sourceAggsForMyDataFirstOffLenIndex[i]); + H5MM_free(sourceAggsForMyDataLastOffLenIndex[i]); + } + H5MM_free(sourceAggsForMyDataFirstOffLenIndex); + H5MM_free(sourceAggsForMyDataLastOffLenIndex); + H5MM_free(currentFDSourceBufferState); + + return; + } /* H5FD_mpio_ccio_osagg_read */ + + +/*------------------------------------------------------------------------- + * Function: H5FD_mpio_ccio_file_read + * + * Purpose: Read data from file into aggregators + * + *------------------------------------------------------------------------- + */ +void H5FD_mpio_ccio_file_read(CustomAgg_FH_Data ca_data, int *error_code, + ADIO_Offset_CA firstFileOffset, ADIO_Offset_CA lastFileOffset, + ADIO_Offset_CA *fd_start, ADIO_Offset_CA* fd_end) + { + int i,j; /* generic iterators */ + int naggs, iAmUsedAgg, myAggRank; + MPI_Status status; + int nprocs, myrank; + int greatestFileDomainAggRank, smallestFileDomainAggRank; + ADIO_Offset_CA greatestFileDomainOffset, smallestFileDomainOffset; + ADIO_Offset_CA coll_bufsize; + ADIO_Offset_CA readFDStart; + ADIO_Offset_CA readFDEnd; + + *error_code = MPI_SUCCESS; /* initialize to success */ + MPI_Comm_size(ca_data->comm, &nprocs); + MPI_Comm_rank(ca_data->comm, &myrank); + + naggs = ca_data->cb_nodes; + iAmUsedAgg = 0; /* whether or not this rank is used as an aggregator. */ + myAggRank = -1; /* if I am an aggregor this is my index into ranklist */ + coll_bufsize = (ADIO_Offset_CA)(ca_data->cb_buffer_size); + + /* + * Confirm that we are only dealing with ONE round here... + */ + int numberOfRounds = 0; + for (j=0;j numberOfRounds) + numberOfRounds = currentNumberOfRounds; + } + if (numberOfRounds > 1) { + printf("ERROR -- Use of H5FD_mpio_ccio_file_read assumes there are is only ONE round for the current aggregation segment!\n"); + } + +#ifdef onesidedtrace + printf("Rank %d (use_dup == %d) called H5FD_mpio_ccio_file_read with segmentFirstFileOffset %d, segmentLastFileOffset %d, segment_stripe_start %d, segment_stripe_end %d. \n",myrank, ca_data->use_dup, (int)firstFileOffset, (int)lastFileOffset, (int)fd_start[0], (int)fd_end[0]); +#endif + + /* This logic defines values that are used later to determine what offsets define the portion + * of the file domain the agg is reading this round. + */ + greatestFileDomainAggRank = -1; + smallestFileDomainAggRank = -1; + greatestFileDomainOffset = 0; + smallestFileDomainOffset = lastFileOffset; + for (j=0;j greatestFileDomainOffset) { + greatestFileDomainOffset = fd_end[j]; + greatestFileDomainAggRank = j; + } + if (fd_start[j] < smallestFileDomainOffset) { + smallestFileDomainOffset = fd_start[j]; + smallestFileDomainAggRank = j; + } + if (ca_data->ranklist[j] == myrank) { + myAggRank = j; + if (fd_end[j] > fd_start[j]) { + iAmUsedAgg = 1; + } + } + } + + readFDStart = 0; + readFDEnd = 0; + if (iAmUsedAgg) { + + /* What offset to read from */ + readFDStart = fd_start[myAggRank]; + if (myAggRank == smallestFileDomainAggRank) { + if (readFDStart < firstFileOffset) + readFDStart = firstFileOffset; + } else if (myAggRank == greatestFileDomainAggRank) { + if (readFDEnd > lastFileOffset) + readFDEnd = lastFileOffset; + } + + /* How much data to read */ + int read_size; + if ((fd_end[myAggRank] - readFDStart) < coll_bufsize) { + readFDEnd = fd_end[myAggRank]; + read_size = ((readFDEnd - readFDStart) + 1); + } else { + readFDEnd = readFDStart + coll_bufsize - (ADIO_Offset_CA) 1; + read_size = coll_bufsize; + } + + /* Read 'read_size' bytes */ + if (ca_data->use_dup) { + MPI_File_iread_at(ca_data->fh, readFDStart, ca_data->io_buf_d, read_size, MPI_BYTE, &ca_data->io_Request_d); + ca_data->check_req_d = 1; + }else { + MPI_File_iread_at(ca_data->fh, readFDStart, ca_data->io_buf, read_size, MPI_BYTE, &ca_data->io_Request); + ca_data->check_req = 1; + } + + } // (iAmUsedAgg) + + /* Synchronize all procs */ + MPI_Barrier(ca_data->comm); + + return; + } /* H5FD_mpio_ccio_file_read */ + +/*------------------------------------------------------------------------- + * Function: calc_file_domains + * + * Purpose: Compute a dynamic access range based file domain partition + * among I/O aggregators, which align to the GPFS block size + * Divide the I/O workload among aggregation processes. This is + * done by (logically) dividing the file into file domains (FDs); each + * process may directly access only its own file domain. + * Additional effort is to make sure that each I/O aggregator gets + * a file domain that aligns to the GPFS block size. So, there will + * not be any false sharing of GPFS file blocks among multiple I/O nodes. + * + * Return: Void. + * + *------------------------------------------------------------------------- + */ +void calc_file_domains(ADIO_Offset_CA *st_offsets, ADIO_Offset_CA *end_offsets, + int nprocs, int nprocs_for_coll, ADIO_Offset_CA *min_st_offset_ptr, + ADIO_Offset_CA **fd_start_ptr, ADIO_Offset_CA **fd_end_ptr, + ADIO_Offset_CA *fd_size_ptr, ADIO_Offset_CA blksize) +{ + ADIO_Offset_CA min_st_offset, max_end_offset, *fd_start, *fd_end, *fd_size; + int i, aggr; + +#ifdef onesidedtrace + printf("calc_file_domains: Blocksize=%ld, nprocs=%ld, nprocs_for_coll=%ld\n",blksize,nprocs,nprocs_for_coll); +#endif + /* find min of start offsets and max of end offsets of all processes */ + min_st_offset = st_offsets [0]; + max_end_offset = end_offsets[0]; + for (i=1; imyrank); + fflush(stdout); +#endif + if (args->size > 0) { + if (args->io_kind == READ_CA) { + args->error_code = MPI_File_read_at(args->fh, args->offset, args->buf, args->size, MPI_BYTE, &(args->error_code)); + } else { + args->error_code = MPI_File_write_at(args->fh, args->offset, args->buf, args->size, MPI_BYTE, &(args->error_code)); + } +#ifdef onesidedtrace + int eclass, len; + char estring[MPI_MAX_ERROR_STRING]; + MPI_Error_string(args->error_code, estring, &len); + printf("Rank %d - Leaving IO_Thread_Func with CODE %d: %s (int: %d) (offset=%d,size=%d)\n", args->myrank, eclass, estring, args->error_code, args->offset, args->size); + fflush(stdout); +#endif + } else { + args->error_code = 0; +#ifdef onesidedtrace + printf("Rank %d - WARNING: Leaving IO_Thread_Func WITHOUT doing IO OP (size = %d)\n", args->myrank, args->size); + fflush(stdout); +#endif + } + pthread_exit(&(args->error_code)); + return NULL; +} + +#endif /* H5_HAVE_PARALLEL */ diff --git a/src/H5FDmpio_topology.h b/src/H5FDmpio_topology.h new file mode 100644 index 0000000..5011d9b --- /dev/null +++ b/src/H5FDmpio_topology.h @@ -0,0 +1,922 @@ + +/* + * Programmers: Richard Zamora + * August 2018, (last modified: December 11th, 2018) + * + * Francois Tessier + * August 2018 + * + * Purpose: This is the topology API, which can be used to select optimal + * aggregator ranks for collective IO opperations. + * + */ + +/*********************/ +/* Define Statements */ +/*********************/ + +/* + * HOWTO Information: + * + * To add a new system, add a new #ifdef block below. At a minimum, you must + * define the NETWORK_BANDWIDTH and NETWORK_LATENCY. You should also add + * '#include' statements for any libraries you need to implement + * machine-specific code in `distance_between_ranks()` and in + * `distance_to_io_node()`. You will probably also need to use/modify the + * `rank_to_coordinates()` function to actually calculate rank-rank hop + * distances. + * + * For systems with PMI support (e.g. #include is available), follow + * the THETA example for calculating rank-rank distances. + * + */ + +/* Machine-specific Defs/Includes for Theta Cray XC40 (@ALCF) */ +#ifdef THETA +#include +#include /* Not used in this version of the API */ +#include /* Not used in this version of the API */ +#define LNETS_PER_OST 7 +#define MAX_IONODES 392 /* 56 OSTs * 7 LNET */ +#define NETWORK_BANDWIDTH 1800000 +#define NETWORK_LATENCY 30 + +/* Machine-specific Defs/Includes for IBM BG/Q Mira/Vesta (@ALCF) */ +#elif defined (BGQ) +#include +#include +#include +#include +#include +#include +#define NETWORK_BANDWIDTH 1800000 +#define NETWORK_LATENCY 30 + +/* Default Machine Defs */ +#else +#define NETWORK_BANDWIDTH 1800000 +#define NETWORK_LATENCY 30 + +#endif + +#define TMIN(a,b) (((a)<(b))?(a):(b)) +#define TMAX(a,b) (((a)>(b))?(a):(b)) +#define LARGE_PENALTY 10000000.0 +#define SMALL_PENALTY 10000.0 /* Note: This penalty is currently arbitrary */ +#define MAX_STR 1024 +#define DBGRANKS 1 // Only shows ranklist on rank==0 if DBGRANKS==0 +//#define topo_debug + +/* + * MPI_CHECK_H5 will display a custom error message as well as an error string + * from the MPI_STATUS and then exit the program. This macro is borrowed + * directly from the HPC-IOR code + */ +#define MPI_CHECK_H5(MPI_STATUS, MSG) do { \ + char resultString[MPI_MAX_ERROR_STRING]; \ + int resultLength; \ + \ + if (MPI_STATUS != MPI_SUCCESS) { \ + MPI_Error_string(MPI_STATUS, resultString, &resultLength); \ + fprintf(stdout, "ior ERROR: %s, MPI %s, (%s:%d)\n", \ + MSG, resultString, __FILE__, __LINE__); \ + fflush(stdout); \ + MPI_Abort(MPI_COMM_WORLD, -1); \ + } \ +} while(0) + + +/*********************/ +/* Special Type Defs */ +/*********************/ + + +typedef struct cost cost; + +struct cost { + double cost; + int rank; +}; + +/* + * AGGSelect is used to select the desired aggregation selection routine + * DATA -> Try to maximize data-movement bandwidth + * SPREAD -> Spread out aggregators using topology information + * STRIDED -> Spread out aggregators according a given stride (using the rank IDs) + * RANDOM -> Use random rank selection for aggregator placement + */ +enum AGGSelect{DEFAULT, DATA, SPREAD, STRIDED, RANDOM}; + + +/************************/ +/* Function Definitions */ +/************************/ + + +/*------------------------------------------------------------------------- + * Function: CountProcsPerNode + * + * Purpose: Count the number of mpi procs that share a host. + * + * Return: The count (int) + * + * NOTE: This also assumes that the task count on all nodes is equal + * to the task count on the host running MPI task 0. + * + *------------------------------------------------------------------------- + */ +static int CountProcsPerNode(int numTasks, int rank, MPI_Comm comm) +{ + char localhost[MAX_STR]; + char hostname0[MAX_STR]; + static int firstPass = true; + unsigned count; + unsigned flag; + int rc; + + rc = gethostname(localhost, MAX_STR); + if (rc == -1) { + /* This node won't match task 0's hostname...except in the + * case where ALL gethostname() calls fail, in which + * case ALL nodes will appear to be on the same node. + * We'll handle that later. */ + localhost[0] = '\0'; + if (rank == 0) perror("gethostname() failed"); + } + + /* send task 0's hostname to all tasks */ + if (rank == 0) strcpy(hostname0, localhost); + MPI_CHECK_H5(MPI_Bcast(hostname0, MAX_STR, MPI_CHAR, 0, comm), "broadcast of task 0's hostname failed"); + if (strcmp(hostname0, localhost) == 0) flag = 1; + else flag = 0; + + /* count the tasks share the same host as task 0 */ + MPI_Allreduce(&flag, &count, 1, MPI_UNSIGNED, MPI_SUM, comm); + + if (hostname0[0] == '\0') count = 1; + return (int)count; +} + +/*------------------------------------------------------------------------- + * Function: network_bandwidth + * + * Purpose: system-dependent (hard-coded) network bandwidth (bytes/ms) + * + * Return: Bandwidth (int64_t) in bytes/ms + * + *------------------------------------------------------------------------- + */ +int64_t network_bandwidth () { + return NETWORK_BANDWIDTH; +} + +/*------------------------------------------------------------------------- + * Function: network_latency + * + * Purpose: system-dependent (hard-coded) network latency (ms) + * + * Return: Latency (int64_t) in milliseconds + * + *------------------------------------------------------------------------- + */ +int64_t network_latency () { + return NETWORK_LATENCY; +} + +/*------------------------------------------------------------------------- + * Function: rank_to_coordinates + * + * Purpose: Given the rank, return the topology coordinates + * + * Return: int* coord array pointer + * + *------------------------------------------------------------------------- + */ +void rank_to_coordinates ( int rank, int* coord ) { +#ifdef THETA + pmi_mesh_coord_t xyz; + int nid; + + /* Hypothesis : PMI_rank == MPI_rank */ + PMI_Get_nid(rank, &nid); + PMI_Get_meshcoord((pmi_nid_t) nid, &xyz); + + coord[0] = xyz.mesh_x; + coord[1] = xyz.mesh_y; + coord[2] = xyz.mesh_z; + coord[3] = nid; + coord[4] = sched_getcpu(); +#elif defined (BGQ) + MPIX_Rank2torus( rank, coord ); +#endif +} + +/*------------------------------------------------------------------------- + * Function: distance_between_ranks + * + * Purpose: Given two ranks, return the number of hops a message needs + * to take to travel between them. + * + * Return: int distance (number of hops between ranks) + * + *------------------------------------------------------------------------- + */ +int distance_between_ranks ( int src_rank, int dest_rank, int ppn, int pps ) { + int distance = 0; + +#ifdef THETA + int dim = 4, d; + int src_coord[dim], dest_coord[dim]; + + rank_to_coordinates ( src_rank, src_coord ); + rank_to_coordinates ( dest_rank, dest_coord ); + + for ( d = 0; d < dim; d++ ) { + if ( src_coord[d] != dest_coord[d] ) + distance++; + } +#elif defined (BGQ) + int dim=6, d, hops; + int src_coord[6], dest_coord[6]; + MPIX_Hardware_t hw; + + rank_to_coordinates ( src_rank, src_coord ); + rank_to_coordinates ( dest_rank, dest_coord ); + + MPIX_Hardware( &hw ); + //dim = hw.torus_dimension; // Should return "6" + + /* Note: dont count last dimension.. it refers to cores on same node */ + for ( d = 0; d < dim-1; d++ ) { + hops = abs ( dest_coord[d] - src_coord[d] ); + if ( hw.isTorus[d] == 1 ) + hops = TMIN ( hops, (int)hw.Size[d] - hops ); + distance += hops; + } +#else + /* + * If we don't have topology information, but do know ppn & pps (per socket), + * just assume simple rank ordering. + * Assume 2 hops between nodes & 1 between sockets. + */ + int same_node = 1; + int same_soc = 0; + if (ppn > 0) { + int ind_i = (src_rank / ppn) * ppn; + int ind_f = (src_rank / ppn) * ppn + ppn; + if ( (dest_rank < ind_i) || (dest_rank >= ind_f) ) { + // NOT Inside same 'node' + same_node = 0; + } + } + if (same_node && (pps > 0)) { + int ind_i = (src_rank / pps) * pps; + int ind_m = (src_rank / pps) * pps + pps; + if ( (dest_rank >= ind_i) && (dest_rank < ind_m) ) same_soc = 1; + } + if (!same_soc) distance++; + if (!same_node) distance++; +#endif + + return distance; +} + +/***********************/ +/* THETA-Specific Defs */ +/***********************/ + +#ifdef THETA + + /*------------------------------------------------------------------------- + * Function: fgr_to_lnets + * + * Purpose: Given a formatted OST label, return the possible lnet nodes. + * + * Return: int *lnet is populated. + * + *------------------------------------------------------------------------- + */ + void fgr_to_lnets ( char *fgr_id, int *lnet ) { + int count = 0; + FILE *fp; + char fline[100]; + char *lnet_list, *item; + + fp = fopen("/etc/lnet/routes.conf", "r"); + + if ( fp == NULL ) { + fprintf ( stdout, "[ERROR] Error while opening routes.conf file!\n" ); + return; + } + + while ( fgets ( fline, 100, fp ) != NULL ) { + + const char *c = strstr ( fline, fgr_id ); + + if ( c != NULL ) { + const char *b1 = strstr ( fline, "[" ) + 1; + const char *b2 = strstr ( fline, "]" ); + lnet_list = ( char * ) malloc ( sizeof ( char ) * ( b2 - b1 + 1 ) ); + strncpy ( lnet_list, b1, b2 - b1 ); + item = strtok ( lnet_list, "," ); + + while ( item ) { + lnet [ count ] = atoi ( item ); + item = strtok ( 0, "," ); + count++; + } + } + count = 0; + } + + fclose ( fp ); + return; + } + + /*------------------------------------------------------------------------- + * Function: io_nodes_per_file + * + * Purpose: Given a file name, determine the LNET nodes. + * + * Return: Number of LNET nodes responsible for file. + * Populate (int *) nodesList with the LNET nodes. + * + * Note: Assume 7 LNET nodes per OST + * + *------------------------------------------------------------------------- + */ + int io_nodes_per_file ( char* filename, int *nodesList ) { + int err, stripeCount, nLnets, i, idx, oid, l; + char fgrId [20]; + int *ssuId, *ostId, *lnets; + struct find_param param = { 0 }; + int ssu2fgr [] = { 0, 0, 0, 0, + 2, 3, 2, 3, 2, 3, 2, 3, 2, 3, 2, 3, 2, 3, + 4, 5, 4, 5, 4, 5, 4, 5, 4, 5, 4, 5, 4, 5, + 6, 7, 6, 7, 6, 7, 6, 7, 6, 7, 6, 7, 6, 7, + 8, 9, 8, 9, 8, 9, 8, 9, 8, 9, 8, 9, 8, 9}; + + err = llapi_getstripe ( filename, ¶m ); + if ( err ) + fprintf ( stdout, "[ERROR] llapi_getstripe\n"); + + stripeCount = (¶m)->fp_lmd->lmd_lmm.lmm_stripe_count; + nLnets = stripeCount * LNETS_PER_OST; + + ssuId = (int *) malloc ( stripeCount * sizeof ( int ) ); + ostId = (int *) malloc ( stripeCount * sizeof ( int ) ); + + /* Hypothesis : OSS id == SNX - 4 */ + for ( i = 0; i < stripeCount; i++ ) { + idx = (¶m)->fp_lmd->lmd_lmm.lmm_objects[i].l_ost_idx; + ssuId[i] = idx + 4; + lnets = (int *) malloc ( LNETS_PER_OST * sizeof ( int ) ); + + snprintf ( fgrId, 20, "o2ib100%d", ssu2fgr[ ssuId[i] ] ); + + fgr_to_lnets ( fgrId, lnets ); + + for ( l = 0; l < LNETS_PER_OST; l++ ) + nodesList [ i * LNETS_PER_OST + l ] = lnets [ l ]; + + free ( lnets ); + } + + return nLnets; + } + + +#endif + + +/*------------------------------------------------------------------------- + * Function: distance_to_io_node + * + * Purpose: Given a rank, determine distance to the nearest IO node. + * + * Return: Number of hops needed to reach the nearest IO node. + * + *------------------------------------------------------------------------- + */ +int distance_to_io_node ( int src_rank ) { +#ifdef THETAIO + + /* + * On Theta/LUSTRE, each OST will be served by 7 different IO nodes. This + * means the writing/reading of a specific stripe (on a specific OST) will + * require the aggregator to interact with up to 7 different IO nodes + * durring the course of a collective I/O operation. + * + * The following code cannot be used to actually determine the exact/avg + * IO-node distace, but it is included in case we get ideas later. :) + */ + int nodesList[MAX_IONODES]; + int n_lnets, i; + n_lnets = io_nodes_per_file ( "/lus/theta-fs0/projects/datascience/rzamora/topology/1D-ARRAY-00000000.dat", nodesList ); + for ( i = 0; i < n_lnets; i++ ) { + fprintf (stdout, "%d ", nodesList[i]); + } + + /* + * Note: This function needs to be improved to actually calculate the distance to IO nodes... + * Fore now, just setting distance to 1: + */ + return 1; + +#elif defined (BGQ) + return MPIX_IO_distance (); + +#endif + + /* Default Value */ + return 1; +} + +/*------------------------------------------------------------------------- + * Function: topology_aware_list_serial + * + * Purpose: Given a `tally` array (of bytes needed to/from each of nb_aggr + * aggregators) on each rank, determine optimal list of ranks + * to act as the aggregators (agg_list). + * + * Return: 0 == Success. Populates int* agg_list + * + * Note: "Serial" label refers to the fact that agg_list must be + * populated one at a time (for now). + * + *------------------------------------------------------------------------- + */ +int topology_aware_list_serial ( int64_t* tally, int64_t nb_aggr, int* agg_list, int ppn, int pps, MPI_Comm comm ) +{ + int i, r, agg_ind, aggr_nprocs, nprocs, latency, bandwidth, distance_to_io, distance, rank; + int agg_to_calc, aggr_comm_rank, aggr_comm_size, ranks_per_agg, ind_i, ind_m, ind_f; + MPI_Comm_rank ( comm, &rank ); + MPI_Comm_size ( comm, &nprocs ); + int64_t *data_distribution; + double base_cost_penalty = 1; + int trim_thresh = 1; + int min_stride; + int *world_ranks; + cost aggr_cost, min_cost; + + latency = network_latency (); /* */ + bandwidth = network_bandwidth (); + data_distribution = (int64_t *) malloc (nprocs * sizeof(int64_t)); + world_ranks = (int *) malloc (nprocs * sizeof(int)); + min_stride = nprocs / nb_aggr; + + /* Loop through the aggregators (this is the `serial` part) */ + for (agg_ind=0; agg_ind 0) { + for (r = nprocs-1; r >= 0; r-- ) { + if (data_distribution[r] < trim_thresh) { + aggr_nprocs--; + for (i = r; i < aggr_nprocs; i++) { + data_distribution[i] = data_distribution[i+1]; + world_ranks[i] = world_ranks[i+1]; + } + } + } + } + + /* Compute the cost of aggregating data from the other ranks */ + for (r = 0; r < aggr_nprocs; r++ ) { + if ( (rank != world_ranks[r]) && (data_distribution[r] > 0)) { + distance = distance_between_ranks ( rank, world_ranks[r], ppn, pps ); + //printf("Rank %d - agg_ind = %d r = %d distance = %d \n", rank, agg_ind, r , distance); + aggr_cost.cost += ( distance * latency + data_distribution[r] / bandwidth ); + } + } + distance_to_io = distance_to_io_node ( rank ); + aggr_cost.cost += distance_to_io * latency; + + /* If "this" rank was selected as the "strided" initial list, give it slight preference */ + if ( aggr_cost.rank == agg_list[ agg_ind ]) aggr_cost.cost-=latency; + + /* Determine the aggr with minimum cost */ + //printf("agg_ind = %d aggr_cost.rank = %d aggr_cost.cost = %f\n",agg_ind,aggr_cost.rank,aggr_cost.cost); + MPI_Allreduce ( &aggr_cost, &min_cost, 1, MPI_DOUBLE_INT, MPI_MINLOC, comm ); + agg_list[ agg_ind ] = min_cost.rank; + //if (rank == 0) printf("agg_ind = %d min_cost.rank = %d min_cost.cost = %f\n",agg_ind,min_cost.rank,min_cost.cost); + + //printf("agg_ind = %d rank = %d base_cost_penalty = %f aggr_cost.cost = %f\n",agg_ind,rank,base_cost_penalty,aggr_cost.cost); + + /* + * Increase `base_cost_penalty` for rank that was just chosen. + * Add smaller penalty if rank is on same node as the last selection. + */ + if (min_cost.rank == rank) + base_cost_penalty += LARGE_PENALTY; + else { + distance = distance_between_ranks ( rank, min_cost.rank, ppn, pps ); + if (distance < 1) + base_cost_penalty += SMALL_PENALTY; + } + + } + + free(data_distribution); + free(world_ranks); + return 0; +} + +/*------------------------------------------------------------------------- + * Function: add_chunk + * + * Purpose: Helper function to recursively populate an array of byte + * quantities, with each index corresponding to the aggregator + * index where that quatitiy of data will be read from or written to. + * + * Return: 0 == Success. Populates int64_t* tally + * + *------------------------------------------------------------------------- + */ +int add_chunk ( int64_t datalen, int64_t offset, int64_t buffer_size, int64_t nb_aggr, int64_t* tally ) +{ + + int64_t agg_offset = 0; + int64_t amount_add = 0; + int64_t amount_left = 0; + + agg_offset = offset % (nb_aggr * buffer_size) ; /* Position of the start of the chunk wrt the round */ + agg_offset = agg_offset / buffer_size ; /* Which aggregator owns the start of this chunk */ + amount_add = buffer_size - (agg_offset % buffer_size) ; /* How many bytes belong in the starting agg */ + if (amount_add > datalen) { /* chunk ends within the same agg */ + amount_add = datalen; + } else { + amount_left = datalen - amount_add; + if (amount_left > 0) { + offset += amount_add; + datalen-= amount_add; + add_chunk ( datalen, offset, buffer_size, nb_aggr, tally ); + } + } + tally[ agg_offset ] += amount_add; + return 0; +} + +/*------------------------------------------------------------------------- + * Function: get_cb_config_list + * + * Purpose: Generate a ROMIO cb_config_list hint string to select an + * optimal list of aggregator nodes. + * + * Return: 0 == Success. Populates char* hint_str + * + * Note: This will only work if there is one rank per node, and fewer + * aggregators than nodes. + * (and if the cb_config_list hint is not ignored) + * + * Assumption: + * --------------------------------------------------------------- + * | agg 1 round 1 | agg 2 round 1 | agg 1 round 2 | agg 2 round 2 | + * --------------------------------------------------------------- + * + *------------------------------------------------------------------------- + */ +int get_cb_config_list ( int64_t* data_lens, int64_t* offsets, int data_len, char* hint_str, int64_t buffer_size, int64_t nb_aggr, MPI_Comm comm ) +{ + int rank, nprocs, i, r, resultlen; + int* agg_list; + int64_t *data_to_send_per_aggr; + char name[MPI_MAX_PROCESSOR_NAME]; + char name_buf[MPI_MAX_PROCESSOR_NAME]; + char* cb_reverse = getenv("HDF5_CB_REV"); + MPI_Comm_rank ( comm, &rank ); + MPI_Comm_size ( comm, &nprocs ); + MPI_Get_processor_name( name, &resultlen ); + int ppn = CountProcsPerNode(nprocs, rank, comm); + int pps = ppn; + + /* Tally data quantities associated with each aggregator */ + data_to_send_per_aggr = (int64_t *) calloc (nb_aggr, sizeof (int64_t)); + for ( r = 0; r < data_len; r++ ) { + add_chunk ( data_lens[r], offsets[r], buffer_size, nb_aggr, data_to_send_per_aggr ); + } + + /* Generate topology-aware list of aggregators */ + agg_list = (int *) calloc (nprocs, sizeof (int)); + topology_aware_list_serial( data_to_send_per_aggr, nb_aggr, agg_list, ppn, pps, comm ); + + /* Reverse the order of the agg list..? */ + if ( cb_reverse && (strcmp(cb_reverse,"yes") == 0) ) { + for (i=0, r=nb_aggr-1; i (nprocs-1))) { + good = 0; + break; + } + } + if (good == 1) break; + } + agg_list[ agg_ind ] = r; + } + } + /* Bcast random list to other ranks */ + MPI_Bcast(&agg_list[0], nb_aggr, MPI_INT, 0, comm); + return 0; +} + +/*------------------------------------------------------------------------- + * Function: get_ranklist_strided + * + * Purpose: Just return a strided selection of ranks + * + * Return: 0 == Success. Populates int* agg_list + * + *------------------------------------------------------------------------- + */ +int get_ranklist_strided ( int64_t nb_aggr, int* agg_list, int stride, MPI_Comm comm ) +{ + int agg_ind, nprocs; + MPI_Comm_size ( comm, &nprocs ); + if (stride < 1) stride = nprocs / nb_aggr; + /* Use rank-0 to crate a random agg placement */ + for (agg_ind=0; agg_ind Try to maximize data-movement bandwidth + * SPREAD -> Spread out aggregators using topology information + * STRIDED -> Spread out aggregators according a given stride (using the rank IDs) + * RANDOM -> Use random rank selection for aggregator placement + * + * + * fd_mapping == 0 Assumption: + * --------------------------------------------------------------- + * | agg 1 round 1 | agg 2 round 1 | agg 1 round 2 | agg 2 round 2 | + * --------------------------------------------------------------- + * + * fd_mapping == 1 Assumption (Use contiguous file domains for each agg): + * --------------------------------------------------------------- + * | agg 1 round 1 | agg 1 round 2 | agg 2 round 1 | agg 2 round 2 | + * --------------------------------------------------------------- + * + * Return: 0 == Success. Populates int *ranklist + * + *------------------------------------------------------------------------- + */ +int topology_aware_ranklist ( int64_t* data_lens, int64_t* offsets, int data_len, + int *ranklist, int64_t buffer_size, int64_t nb_aggr, int ppn, int pps, + int stride, MPI_Comm comm, enum AGGSelect select_type, int fd_mapping ) +{ + int r; + int64_t *data_to_send_per_aggr; + double min_off_g, max_off_g, min_off_l, max_off_l; + int64_t min_off, max_off, off; + int64_t st_agg, in_0, in_1; + +#ifdef topo_debug + int rank, myrank, nprocs; + MPI_Comm_rank ( comm, &myrank ); + MPI_Comm_size ( comm, &nprocs ); +#endif + + switch(select_type) { + + case DATA : + { + /* Tally data quantities associated with each aggregator */ + data_to_send_per_aggr = (int64_t *) calloc (nb_aggr, sizeof (int64_t)); + + if (fd_mapping==1) { /* GPFS-style mapping */ + + /* get local min and max offsets */ + + min_off = offsets[0]; + max_off = offsets[0] + data_lens[0]; + for ( r = 1; r < data_len; r++ ) { + if (offsets[r] < min_off) + min_off = offsets[r]; + off = offsets[r] + data_lens[r]; + if (off > max_off) + max_off = off; + } + min_off_l = (double) min_off; + max_off_l = (double) max_off; + + /* Use allreduce to get global min and max offsets */ + MPI_Allreduce ( &min_off_l, &min_off_g, 1, MPI_DOUBLE, MPI_MIN, comm ); + MPI_Allreduce ( &max_off_l, &max_off_g, 1, MPI_DOUBLE, MPI_MAX, comm ); + + min_off = (int64_t) min_off_g; + max_off = (int64_t) max_off_g; + +#ifdef topo_debug + if (DBGRANKS > 0) { + for (rank=0;rank %ld]", offsets[r], offsets[r]+data_lens[r] ); + printf("\n"); + fflush(stdout); + } + MPI_Barrier(comm); + } + } +#endif + + /* Loop through data to add counts to known file domains */ + int64_t fd_size = (max_off - min_off) / nb_aggr; + for ( r = 0; r < data_len; r++ ) { + st_agg = (offsets[r] - min_off) / fd_size; + //in_0 = (offsets[r]-min_off) % fd_size; + in_0 = ((st_agg + 1) * fd_size) - (offsets[r] - min_off); + in_0 = TMIN ( in_0, data_lens[r] ); + in_1 = TMAX(0, data_lens[r] - in_0); + data_to_send_per_aggr[ (int) st_agg ] += in_0; + data_to_send_per_aggr[ (int) ((st_agg+1)%nb_aggr) ] += in_1; + } + + } else { /* LUSTRE-style mapping */ + for ( r = 0; r < data_len; r++ ) { + add_chunk ( data_lens[r], offsets[r], buffer_size, nb_aggr, data_to_send_per_aggr ); + } + } + + /* Generate topology-aware list of aggregators */ + topology_aware_list_serial( data_to_send_per_aggr, nb_aggr, ranklist, ppn, pps, comm ); + break; + } + case SPREAD : + { + /* Generate spread-out list of aggregators */ + get_ranklist_spread ( nb_aggr, ranklist, ppn, pps, comm ); + break; + } + case STRIDED : + { + /* Generate constant-strided list of aggregators */ + get_ranklist_strided ( nb_aggr, ranklist, stride, comm ); + break; + } + case RANDOM : + { + /* Generate random list of aggregators */ + get_ranklist_random ( nb_aggr, ranklist, comm ); + break; + } + default : + { + /* Generate random list of aggregators */ + get_ranklist_strided ( nb_aggr, ranklist, 0, comm ); + } + + } + +#ifdef topo_debug + if (myrank == 0) { + printf("Topology-aware CB Selection (type %d): nb_aggr is %d, and ranklist is:", select_type, nb_aggr); + for (r=0;r 0)) { + for (rank=0;rankshared); + HDassert(H5F_INTENT(f) & H5F_ACC_RDWR); + HDassert(H5F_addr_defined(addr)); + HDassert(buf); + + /* Check for attempting I/O on 'temporary' file address */ + if(NULL == (fspace = (H5S_t *)H5I_object_verify(file_space, H5I_DATASPACE))) + HGOTO_ERROR(H5E_IO, H5E_BADTYPE, FAIL, "file dataspace ID not valid") + if (!(fspace==NULL)){ + if((snpoints = H5S_GET_EXTENT_NPOINTS(fspace)) < 0) + HGOTO_ERROR(H5E_IO, H5E_CANTGET, FAIL, "can't retrieve # of elements for file dataspace") + npoints = (hsize_t)snpoints; + if(H5F_addr_le(f->shared->tmp_addr, (addr + (npoints * elmt_size)))) + HGOTO_ERROR(H5E_IO, H5E_BADRANGE, FAIL, "attempting I/O in temporary file space") + } + + /* Treat global heap as raw data */ + map_type = (type == H5FD_MEM_GHEAP) ? H5FD_MEM_DRAW : type; + file_ptr = (f->shared->lf); + + /* Pass I/O down to next layer */ + if(H5FD_select_write(file_ptr, map_type, file_space, mem_space, elmt_size, addr, buf) < 0) + HGOTO_ERROR(H5E_IO, H5E_WRITEERROR, FAIL, "selection write failed") + +done: + FUNC_LEAVE_NOAPI(ret_value) +} /* end H5F_select_write() */ + +/*------------------------------------------------------------------------- + * Function: H5F_select_read + * + * Purpose: Reads selected data from a file/server/etc into a buffer. + * The address is relative to the base address for the file. + * The file and memory selection determine the elements to read + * from the file, and the elmt_size is the size of each element + * in bytes. The address in the file and the buffer in memory + * are assumed to point at the multidimensional array in the + * file and memory spaces, respectively. + * + * Return: Non-negative on success/Negative on failure + * + * Programmer: Quincey Koziol + * koziol@lbl.gov + * Nov 8 2017 + * + *------------------------------------------------------------------------- + */ +herr_t +H5F_select_read(const H5F_t *f, H5FD_mem_t type, hid_t file_space, + hid_t mem_space, size_t elmt_size, haddr_t addr, void *buf /*out*/) +{ + H5FD_mem_t map_type; /* Mapped memory type */ + H5S_t *fspace; /* File dataspace */ + hsize_t npoints; /* # of elements in dataspace */ + hssize_t snpoints; /* # of elements in dataspace (signed) */ + H5FD_t *file_ptr; /* H5FD file pointer */ + herr_t ret_value = SUCCEED; /* Return value */ + + FUNC_ENTER_NOAPI(FAIL) + + /* Sanity checks */ + HDassert(f); + HDassert(f->shared); + HDassert(H5F_addr_defined(addr)); + HDassert(buf); + + /* Check for attempting I/O on 'temporary' file address */ + if(NULL == (fspace = (H5S_t *)H5I_object_verify(file_space, H5I_DATASPACE))) + HGOTO_ERROR(H5E_IO, H5E_BADTYPE, FAIL, "file dataspace ID not valid") + if (!(fspace==NULL)){ + if((snpoints = H5S_GET_EXTENT_NPOINTS(fspace)) < 0) + HGOTO_ERROR(H5E_IO, H5E_CANTGET, FAIL, "can't retrieve # of elements for file dataspace") + npoints = (hsize_t)snpoints; + if(H5F_addr_le(f->shared->tmp_addr, (addr + (npoints * elmt_size)))) + HGOTO_ERROR(H5E_IO, H5E_BADRANGE, FAIL, "attempting I/O in temporary file space") + } + + /* Treat global heap as raw data */ + map_type = (type == H5FD_MEM_GHEAP) ? H5FD_MEM_DRAW : type; + file_ptr = (f->shared->lf); + + /* Pass I/O down to next layer */ + if(H5FD_select_read(file_ptr, map_type, file_space, mem_space, elmt_size, addr, buf) < 0) + HGOTO_ERROR(H5E_IO, H5E_READERROR, FAIL, "selection read failed") + +done: + FUNC_LEAVE_NOAPI(ret_value) +} /* end H5F_select_read() */ + /*------------------------------------------------------------------------- * Function: H5F_flush_tagged_metadata * - * Purpose: Flushes metadata with specified tag in the metadata cache + * Purpose: Flushes metadata with specified tag in the metadata cache * to disk. * * Return: Non-negative on success/Negative on failure @@ -327,4 +450,3 @@ H5F_get_checksums(const uint8_t *buf, size_t buf_size, uint32_t *s_chksum/*out*/ FUNC_LEAVE_NOAPI(SUCCEED) } /* end H5F_get_chksums() */ - diff --git a/src/H5Fprivate.h b/src/H5Fprivate.h index 83513a5..b6878b2 100644 --- a/src/H5Fprivate.h +++ b/src/H5Fprivate.h @@ -479,11 +479,11 @@ typedef struct H5F_t H5F_t; #define H5F_ACS_SDATA_BLOCK_SIZE_NAME "sdata_block_size" /* Minimum "small data" allocation block size (when aggregating "small" raw data allocations) */ #define H5F_ACS_GARBG_COLCT_REF_NAME "gc_ref" /* Garbage-collect references */ #define H5F_ACS_FILE_DRV_NAME "vfd_info" /* File driver ID & info */ -#define H5F_ACS_VOL_CONN_NAME "vol_connector_info" /* VOL connector ID & info */ +#define H5F_ACS_VOL_DRV_NAME "vol_driver_info" /* VOL driver ID & info */ #define H5F_ACS_CLOSE_DEGREE_NAME "close_degree" /* File close degree */ #define H5F_ACS_FAMILY_OFFSET_NAME "family_offset" /* Offset position in file for family file driver */ #define H5F_ACS_FAMILY_NEWSIZE_NAME "family_newsize" /* New member size of family driver. (private property only used by h5repart) */ -#define H5F_ACS_FAMILY_TO_SINGLE_NAME "family_to_single" /* Whether to convert family to a single-file driver. (private property only used by h5repart) */ +#define H5F_ACS_FAMILY_TO_SEC2_NAME "family_to_sec2" /* Whether to convert family to sec2 driver. (private property only used by h5repart) */ #define H5F_ACS_MULTI_TYPE_NAME "multi_type" /* Data type in multi file driver */ #define H5F_ACS_LIBVER_LOW_BOUND_NAME "libver_low_bound" /* 'low' bound of library format versions */ #define H5F_ACS_LIBVER_HIGH_BOUND_NAME "libver_high_bound" /* 'high' bound of library format versions */ @@ -716,7 +716,6 @@ typedef enum H5F_prefix_open_t { /* Private functions */ H5_DLL H5F_t *H5F_open(const char *name, unsigned flags, hid_t fcpl_id, hid_t fapl_id); H5_DLL herr_t H5F_try_close(H5F_t *f, hbool_t *was_closed/*out*/); -H5_DLL hid_t H5F_get_file_id(hid_t obj_id, H5I_type_t id_type); /* Functions that retrieve values from the file struct */ H5_DLL H5F_libver_t H5F_get_low_bound(const H5F_t *f); @@ -735,7 +734,7 @@ H5_DLL H5F_t *H5F_get_parent(const H5F_t *f); H5_DLL unsigned H5F_get_nmounts(const H5F_t *f); H5_DLL unsigned H5F_get_read_attempts(const H5F_t *f); H5_DLL hid_t H5F_get_access_plist(H5F_t *f, hbool_t app_ref); -H5_DLL hid_t H5F_get_id(H5F_t *file); +H5_DLL hid_t H5F_get_id(H5F_t *file, hbool_t app_ref); H5_DLL herr_t H5F_get_obj_count(const H5F_t *f, unsigned types, hbool_t app_ref, size_t *obj_id_count_ptr); H5_DLL herr_t H5F_get_obj_ids(const H5F_t *f, unsigned types, size_t max_objs, hid_t *oid_list, hbool_t app_ref, size_t *obj_id_count_ptr); H5_DLL hsize_t H5F_get_pgend_meta_thres(const H5F_t *f); @@ -797,6 +796,14 @@ H5_DLL herr_t H5F_flush_mounts(H5F_t *f); H5_DLL herr_t H5F_block_read(H5F_t *f, H5FD_mem_t type, haddr_t addr, size_t size, void *buf/*out*/); H5_DLL herr_t H5F_block_write(H5F_t *f, H5FD_mem_t type, haddr_t addr, size_t size, const void *buf); +/* Functions that operate on selections of elements wrt super block */ +H5_DLL herr_t H5F_select_read(const H5F_t *f, H5FD_mem_t type, + hid_t file_space, hid_t mem_space, size_t elmt_size, + haddr_t addr, void *buf/*out*/); +H5_DLL herr_t H5F_select_write(const H5F_t *f, H5FD_mem_t type, + hid_t file_space, hid_t mem_space, size_t elmt_size, + haddr_t addr, const void *buf); + /* Functions that flush or evict */ H5_DLL herr_t H5F_flush_tagged_metadata(H5F_t *f, haddr_t tag); H5_DLL herr_t H5F_evict_tagged_metadata(H5F_t *f, haddr_t tag); @@ -856,4 +863,3 @@ H5_DLL herr_t H5F_cwfs_remove_heap(H5F_file_t *shared, struct H5HG_heap_t *heap) H5_DLL herr_t H5F_debug(H5F_t *f, FILE * stream, int indent, int fwidth); #endif /* _H5Fprivate_H */ - diff --git a/src/H5Sall.c b/src/H5Sall.c index 3b77b98..45a8846 100644 --- a/src/H5Sall.c +++ b/src/H5Sall.c @@ -31,9 +31,9 @@ /* Selection callbacks */ static herr_t H5S__all_copy(H5S_t *dst, const H5S_t *src, hbool_t share_selection); -static herr_t H5S__all_get_seq_list(const H5S_t *space, unsigned flags, - H5S_sel_iter_t *iter, size_t maxseq, size_t maxbytes, - size_t *nseq, size_t *nbytes, hsize_t *off, size_t *len); +//static herr_t H5S__all_get_seq_list(const H5S_t *space, unsigned flags, +// H5S_sel_iter_t *iter, size_t maxseq, size_t maxbytes, +// size_t *nseq, size_t *nbytes, hsize_t *off, size_t *len); static herr_t H5S__all_release(H5S_t *space); static htri_t H5S__all_is_valid(const H5S_t *space); static hssize_t H5S__all_serial_size(const H5S_t *space); @@ -987,7 +987,7 @@ done: EXAMPLES REVISION LOG --------------------------------------------------------------------------*/ -static herr_t +herr_t H5S__all_get_seq_list(const H5S_t H5_ATTR_UNUSED *space, unsigned H5_ATTR_UNUSED flags, H5S_sel_iter_t *iter, size_t H5_ATTR_UNUSED maxseq, size_t maxelem, size_t *nseq, size_t *nelem, hsize_t *off, size_t *len) @@ -1028,4 +1028,3 @@ H5S__all_get_seq_list(const H5S_t H5_ATTR_UNUSED *space, unsigned H5_ATTR_UNUSED FUNC_LEAVE_NOAPI(SUCCEED) } /* end H5S__all_get_seq_list() */ - diff --git a/src/H5Shyper.c b/src/H5Shyper.c index ed3fa45..c989083 100644 --- a/src/H5Shyper.c +++ b/src/H5Shyper.c @@ -75,9 +75,9 @@ static hsize_t H5S__hyper_get_clip_extent_real(const H5S_t *clip_space, /* Selection callbacks */ static herr_t H5S__hyper_copy(H5S_t *dst, const H5S_t *src, hbool_t share_selection); -static herr_t H5S__hyper_get_seq_list(const H5S_t *space, unsigned flags, - H5S_sel_iter_t *iter, size_t maxseq, size_t maxbytes, - size_t *nseq, size_t *nbytes, hsize_t *off, size_t *len); +//static herr_t H5S__hyper_get_seq_list(const H5S_t *space, unsigned flags, +// H5S_sel_iter_t *iter, size_t maxseq, size_t maxbytes, +// size_t *nseq, size_t *nbytes, hsize_t *off, size_t *len); static herr_t H5S__hyper_release(H5S_t *space); static htri_t H5S__hyper_is_valid(const H5S_t *space); static hssize_t H5S__hyper_serial_size(const H5S_t *space); @@ -4522,22 +4522,22 @@ H5S__hyper_project_simple(const H5S_t *base_space, H5S_t *new_space, hsize_t *of /* Copy the diminfo */ while(base_space_dim < base_space->extent.rank) { - new_space->select.sel_info.hslab->app_diminfo[new_space_dim].start = + new_space->select.sel_info.hslab->app_diminfo[new_space_dim].start = base_space->select.sel_info.hslab->app_diminfo[base_space_dim].start; - new_space->select.sel_info.hslab->app_diminfo[new_space_dim].stride = + new_space->select.sel_info.hslab->app_diminfo[new_space_dim].stride = base_space->select.sel_info.hslab->app_diminfo[base_space_dim].stride; - new_space->select.sel_info.hslab->app_diminfo[new_space_dim].count = + new_space->select.sel_info.hslab->app_diminfo[new_space_dim].count = base_space->select.sel_info.hslab->app_diminfo[base_space_dim].count; - new_space->select.sel_info.hslab->app_diminfo[new_space_dim].block = + new_space->select.sel_info.hslab->app_diminfo[new_space_dim].block = base_space->select.sel_info.hslab->app_diminfo[base_space_dim].block; - new_space->select.sel_info.hslab->opt_diminfo[new_space_dim].start = + new_space->select.sel_info.hslab->opt_diminfo[new_space_dim].start = base_space->select.sel_info.hslab->opt_diminfo[base_space_dim].start; new_space->select.sel_info.hslab->opt_diminfo[new_space_dim].stride = base_space->select.sel_info.hslab->opt_diminfo[base_space_dim].stride; - new_space->select.sel_info.hslab->opt_diminfo[new_space_dim].count = + new_space->select.sel_info.hslab->opt_diminfo[new_space_dim].count = base_space->select.sel_info.hslab->opt_diminfo[base_space_dim].count; - new_space->select.sel_info.hslab->opt_diminfo[new_space_dim].block = + new_space->select.sel_info.hslab->opt_diminfo[new_space_dim].block = base_space->select.sel_info.hslab->opt_diminfo[base_space_dim].block; /* Advance to next dimensions */ @@ -7224,7 +7224,7 @@ H5S_select_hyperslab (H5S_t *space, H5S_seloper_t op, /* Check for unlimited dimension */ for(u = 0; uextent.rank; u++) if((count[u] == H5S_UNLIMITED) || (block[u] == H5S_UNLIMITED)) { - if(unlim_dim >= 0) + if(unlim_dim >= 0) HGOTO_ERROR(H5E_DATASPACE, H5E_UNSUPPORTED, FAIL, "cannot have more than one unlimited dimension in selection") else { if(count[u] == block[u] /* == H5S_UNLIMITED */) @@ -9086,7 +9086,7 @@ H5S__hyper_get_seq_list_single(const H5S_t *space, H5S_sel_iter_t *iter, EXAMPLES REVISION LOG --------------------------------------------------------------------------*/ -static herr_t +herr_t H5S__hyper_get_seq_list(const H5S_t *space, unsigned H5_ATTR_UNUSED flags, H5S_sel_iter_t *iter, size_t maxseq, size_t maxelem, size_t *nseq, size_t *nelem, hsize_t *off, size_t *len) @@ -9314,7 +9314,7 @@ H5S__hyper_project_intersection(const H5S_t *src_space, const H5S_t *dst_space, HDassert(dst_space); HDassert(src_intersect_space); HDassert(proj_space); - + /* Assert that src_space and src_intersect_space have same extent and there * are no point selections */ HDassert(H5S_GET_EXTENT_NDIMS(src_space) @@ -9483,7 +9483,7 @@ H5S__hyper_project_intersection(const H5S_t *src_space, const H5S_t *dst_space, * selection and advance any sequences we complete */ if(ss_off[ss_i] >= sis_off[sis_i]) int_sel_off = ss_sel_off; - else + else int_sel_off = sis_off[sis_i] - ss_off[ss_i] + ss_sel_off; if((ss_off[ss_i] + (hsize_t)ss_len[ss_i]) <= (sis_off[sis_i] + (hsize_t)sis_len[sis_i])) { @@ -10429,4 +10429,3 @@ H5Sget_regular_hyperslab(hid_t spaceid, hsize_t start[], hsize_t stride[], done: FUNC_LEAVE_API(ret_value) } /* H5Sget_regular_hyperslab() */ - diff --git a/src/H5Smpio.c b/src/H5Smpio.c index 935d279..78d1b4f 100644 --- a/src/H5Smpio.c +++ b/src/H5Smpio.c @@ -41,12 +41,12 @@ static herr_t H5S_mpio_all_type(const H5S_t *space, size_t elmt_size, MPI_Datatype *new_type, int *count, hbool_t *is_derived_type); static herr_t H5S_mpio_none_type(MPI_Datatype *new_type, int *count, hbool_t *is_derived_type); -static herr_t H5S_mpio_create_point_datatype(size_t elmt_size, hsize_t num_points, +static herr_t H5S_mpio_create_point_datatype(size_t elmt_size, hsize_t num_points, MPI_Aint *disp, MPI_Datatype *new_type); static herr_t H5S_mpio_point_type(const H5S_t *space, size_t elmt_size, MPI_Datatype *new_type, int *count, hbool_t *is_derived_type, hbool_t do_permute, hsize_t **permute_map, hbool_t *is_permuted); -static herr_t H5S_mpio_permute_type(const H5S_t *space, size_t elmt_size, +static herr_t H5S_mpio_permute_type(const H5S_t *space, size_t elmt_size, hsize_t **permute_map, MPI_Datatype *new_type, int *count, hbool_t *is_derived_type); static herr_t H5S_mpio_hyper_type(const H5S_t *space, size_t elmt_size, @@ -198,9 +198,9 @@ H5S_mpio_none_type(MPI_Datatype *new_type, int *count, hbool_t *is_derived_type) * *------------------------------------------------------------------------- */ -static herr_t +static herr_t H5S_mpio_create_point_datatype (size_t elmt_size, hsize_t num_points, - MPI_Aint *disp, MPI_Datatype *new_type) + MPI_Aint *disp, MPI_Datatype *new_type) { MPI_Datatype elmt_type; /* MPI datatype for individual element */ hbool_t elmt_type_created = FALSE; /* Whether the element MPI datatype was created */ @@ -239,7 +239,7 @@ H5S_mpio_create_point_datatype (size_t elmt_size, hsize_t num_points, if(MPI_SUCCESS != (mpi_code = MPI_Type_commit(new_type))) HMPI_GOTO_ERROR(FAIL, "MPI_Type_commit failed", mpi_code) } - else { + else { /* use LARGE_DATATYPE:: * We'll create an hindexed_block type for every 2G point count and then combine * those and any remaining points into a single large datatype. @@ -373,7 +373,7 @@ done: *------------------------------------------------------------------------- */ static herr_t -H5S_mpio_point_type(const H5S_t *space, size_t elmt_size, MPI_Datatype *new_type, +H5S_mpio_point_type(const H5S_t *space, size_t elmt_size, MPI_Datatype *new_type, int *count, hbool_t *is_derived_type, hbool_t do_permute, hsize_t **permute, hbool_t *is_permuted) { @@ -410,19 +410,19 @@ H5S_mpio_point_type(const H5S_t *space, size_t elmt_size, MPI_Datatype *new_type disp[u] = H5VM_array_offset(space->extent.rank, space->extent.size, curr->pnt); disp[u] *= elmt_size; - /* This is a File Space used to set the file view, so adjust the displacements + /* This is a File Space used to set the file view, so adjust the displacements * to have them monotonically non-decreasing. - * Generate the permutation array by indicating at each point being selected, - * the position it will shifted in the new displacement. Example: - * Suppose 4 points with corresponding are selected - * Pt 1: disp=6 ; Pt 2: disp=3 ; Pt 3: disp=0 ; Pt 4: disp=4 + * Generate the permutation array by indicating at each point being selected, + * the position it will shifted in the new displacement. Example: + * Suppose 4 points with corresponding are selected + * Pt 1: disp=6 ; Pt 2: disp=3 ; Pt 3: disp=0 ; Pt 4: disp=4 * The permute map to sort the displacements in order will be: * point 1: map[0] = L, indicating that this point is not moved (1st point selected) - * point 2: map[1] = 0, indicating that this point is moved to the first position, + * point 2: map[1] = 0, indicating that this point is moved to the first position, * since disp_pt1(6) > disp_pt2(3) - * point 3: map[2] = 0, move to position 0, bec it has the lowest disp between + * point 3: map[2] = 0, move to position 0, bec it has the lowest disp between * the points selected so far. - * point 4: map[3] = 2, move the 2nd position since point 1 has a higher disp, + * point 4: map[3] = 2, move the 2nd position since point 1 has a higher disp, * but points 2 and 3 have lower displacements. */ if(do_permute) { @@ -447,7 +447,7 @@ H5S_mpio_point_type(const H5S_t *space, size_t elmt_size, MPI_Datatype *new_type HDmemmove(disp + m + 1, disp + m, (u - m) * sizeof(MPI_Aint)); disp[m] = temp; } /* end if */ - (*permute)[u] = m; + (*permute)[u] = m; } /* end if */ else (*permute)[u] = num_points; @@ -508,7 +508,7 @@ done: *------------------------------------------------------------------------- */ static herr_t -H5S_mpio_permute_type(const H5S_t *space, size_t elmt_size, hsize_t **permute, +H5S_mpio_permute_type(const H5S_t *space, size_t elmt_size, hsize_t **permute, MPI_Datatype *new_type, int *count, hbool_t *is_derived_type) { MPI_Aint *disp = NULL; /* Datatype displacement for each point*/ @@ -571,12 +571,12 @@ H5S_mpio_permute_type(const H5S_t *space, size_t elmt_size, hsize_t **permute, /* Set the displacement of the current point */ disp[u] = curr_off; - /* This is a memory displacement, so for each point selected, + /* This is a memory displacement, so for each point selected, * apply the map that was generated by the file selection */ if((*permute)[u] != num_points) { MPI_Aint temp = disp[u]; - HDmemmove(disp + (*permute)[u] + 1, disp + (*permute)[u], + HDmemmove(disp + (*permute)[u] + 1, disp + (*permute)[u], (u - (*permute)[u]) * sizeof(MPI_Aint)); disp[(*permute)[u]] = temp; } /* end if */ @@ -795,7 +795,7 @@ H5S_mpio_hyper_type(const H5S_t *space, size_t elmt_size, #endif /* LARGE_DATATYPE:: - * Check if the number of elements to form the inner type fits into a 32 bit integer. + * Check if the number of elements to form the inner type fits into a 32 bit integer. * If yes then just create the innertype with MPI_Type_contiguous. * Otherwise create a compound datatype by iterating as many times as needed * for the innertype to be created. @@ -848,8 +848,8 @@ H5S_mpio_hyper_type(const H5S_t *space, size_t elmt_size, HMPI_GOTO_ERROR(FAIL, "couldn't create MPI vector type", mpi_code) } else { - /* Things get a bit more complicated and require LARGE_DATATYPE processing - * There are two MPI datatypes that need to be created: + /* Things get a bit more complicated and require LARGE_DATATYPE processing + * There are two MPI datatypes that need to be created: * 1) an internal contiguous block; and * 2) a collection of elements where an element is a contiguous block(1). * Remember that the input arguments to the MPI-IO functions use integer @@ -863,30 +863,23 @@ H5S_mpio_hyper_type(const H5S_t *space, size_t elmt_size, MPI_Datatype block_type; /* create a contiguous datatype inner_type x number of BLOCKS. - * Again we need to check that the number of BLOCKS can fit into + * Again we need to check that the number of BLOCKS can fit into * a 32 bit integer */ if (bigio_count < d[i].block) { - if (H5S_mpio_create_large_type(d[i].block, 0, inner_type, + if (H5S_mpio_create_large_type(d[i].block, 0, inner_type, &block_type) < 0) { HGOTO_ERROR(H5E_DATASPACE, H5E_BADTYPE, FAIL, "couldn't ccreate a large block datatype in hyper selection") } } else { - if(MPI_SUCCESS != (mpi_code = MPI_Type_contiguous((int)d[i].block, - inner_type, + if(MPI_SUCCESS != (mpi_code = MPI_Type_contiguous((int)d[i].block, + inner_type, &block_type))) HMPI_GOTO_ERROR(FAIL, "MPI_Type_contiguous failed", mpi_code) } - /* As of version 4.0, OpenMPI now turns off MPI-1 API calls by default, - * so we're using the MPI-2 version even though we don't need the lb - * value. - */ - { - MPI_Aint unused_lb_arg; - MPI_Type_get_extent(inner_type, &unused_lb_arg, &inner_extent); - } + MPI_Type_extent (inner_type, &inner_extent); stride_in_bytes = inner_extent * (MPI_Aint)d[i].strid; /* If the element count is larger than what a 32 bit integer can hold, @@ -979,7 +972,42 @@ done: FUNC_LEAVE_NOAPI(ret_value) } /* end H5S_mpio_hyper_type() */ - + +/* This function allows the rank and extent of the space to accessed from the H5S_t structure + * from modules (like FD) outside of the space module. + */ +herr_t +H5S_mpio_return_space_rank_and_extent(const H5S_t *space, unsigned *rank, hsize_t *extent) { + + *rank = space->extent.rank; + *extent = 1; + for (int i=0;i<(*rank);i++) { + *extent *= space->extent.size[i]; + } + + herr_t ret_value = SUCCEED; + return ret_value; +} + +/* This function allows the extent and select type of the space to be gotten from the H5S_t structure + * from modules like FD outside of the space module. + */ +herr_t +H5S_mpio_return_space_extent_and_select_type(const H5S_t *space, hbool_t *is_permuted, hbool_t *is_regular, H5S_class_t *space_extent_type, H5S_sel_type *space_sel_type) { + + herr_t ret_value = SUCCEED; /* Return value */ + + FUNC_ENTER_NOAPI_NOINIT + + *space_extent_type = H5S_GET_EXTENT_TYPE(space); + *space_sel_type = H5S_GET_SELECT_TYPE(space); + *is_regular = H5S_SELECT_IS_REGULAR(space); + +done: + FUNC_LEAVE_NOAPI(ret_value) +} + + /*------------------------------------------------------------------------- * Function: H5S_mpio_span_hyper_type * @@ -997,7 +1025,7 @@ done: * * Modifications: * Mohamad Chaarawi - * Adding support for large datatypes (beyond the limit of a + * Adding support for large datatypes (beyond the limit of a * 32 bit integer. *------------------------------------------------------------------------- */ @@ -1156,8 +1184,8 @@ H5S_obtain_datatype(const hsize_t *down, H5S_hyper_span_t *span, } } else { - if(MPI_SUCCESS != (mpi_code = MPI_Type_contiguous((int)blocklen[i], - *elmt_type, + if(MPI_SUCCESS != (mpi_code = MPI_Type_contiguous((int)blocklen[i], + *elmt_type, &temp_type))) HMPI_GOTO_ERROR(FAIL, "MPI_Type_contiguous failed", mpi_code) } @@ -1181,11 +1209,11 @@ H5S_obtain_datatype(const hsize_t *down, H5S_hyper_span_t *span, *span_type = outer_type; } - if (outer_type != MPI_DATATYPE_NULL) + if (outer_type != MPI_DATATYPE_NULL) MPI_Type_free(&outer_type); /* temp_type shouldn't be freed here... * Note that we have simply copied it above (not MPI_Type_dup) - * into the 'span_type' argument of the caller. + * into the 'span_type' argument of the caller. * The caller needs to deal with it there! */ } @@ -1312,7 +1340,7 @@ done: *------------------------------------------------------------------------- */ herr_t -H5S_mpio_space_type(const H5S_t *space, size_t elmt_size, MPI_Datatype *new_type, +H5S_mpio_space_type(const H5S_t *space, size_t elmt_size, MPI_Datatype *new_type, int *count, hbool_t *is_derived_type, hbool_t do_permute, hsize_t **permute_map, hbool_t *is_permuted) { @@ -1333,7 +1361,7 @@ H5S_mpio_space_type(const H5S_t *space, size_t elmt_size, MPI_Datatype *new_type * out-of-order point selection, then permute this selection which * should be a memory selection to match the file space permutation. */ - if(TRUE == *is_permuted) { + if(TRUE == *is_permuted) { switch(H5S_GET_SELECT_TYPE(space)) { case H5S_SEL_NONE: if(H5S_mpio_none_type(new_type, count, is_derived_type) < 0) @@ -1409,7 +1437,7 @@ done: /*------------------------------------------------------------------------- * Function: H5S_mpio_create_large_type * - * Purpose: Create a large datatype of size larger than what a 32 bit integer + * Purpose: Create a large datatype of size larger than what a 32 bit integer * can hold. * * Return: non-negative on success, negative on failure. @@ -1507,14 +1535,7 @@ static herr_t H5S_mpio_create_large_type (hsize_t num_elements, } } - /* As of version 4.0, OpenMPI now turns off MPI-1 API calls by default, - * so we're using the MPI-2 version even though we don't need the lb - * value. - */ - { - MPI_Aint unused_lb_arg; - MPI_Type_get_extent(old_type, &unused_lb_arg, &old_extent); - } + MPI_Type_extent (old_type, &old_extent); /* Set up the arguments for MPI_Type_struct constructor */ type[0] = outer_type; @@ -1546,4 +1567,3 @@ done: } /* end H5S_mpio_create_large_type */ #endif /* H5_HAVE_PARALLEL */ - diff --git a/src/H5Spoint.c b/src/H5Spoint.c index 9924920..bd03d68 100644 --- a/src/H5Spoint.c +++ b/src/H5Spoint.c @@ -33,9 +33,9 @@ /* Selection callbacks */ static herr_t H5S_point_copy(H5S_t *dst, const H5S_t *src, hbool_t share_selection); -static herr_t H5S_point_get_seq_list(const H5S_t *space, unsigned flags, - H5S_sel_iter_t *iter, size_t maxseq, size_t maxbytes, - size_t *nseq, size_t *nbytes, hsize_t *off, size_t *len); +//static herr_t H5S_point_get_seq_list(const H5S_t *space, unsigned flags, +// H5S_sel_iter_t *iter, size_t maxseq, size_t maxbytes, +// size_t *nseq, size_t *nbytes, hsize_t *off, size_t *len); static herr_t H5S_point_release(H5S_t *space); static htri_t H5S_point_is_valid(const H5S_t *space); static hssize_t H5S_point_serial_size(const H5S_t *space); @@ -461,7 +461,7 @@ done: /* Release possible linked list of nodes */ while(top) { - curr = top->next; + curr = top->next; H5MM_xfree(top->pnt); top = H5FL_FREE(H5S_pnt_node_t, top); top = curr; @@ -1433,7 +1433,7 @@ H5S_point_project_scalar(const H5S_t *space, hsize_t *offset) HGOTO_ERROR(H5E_DATASPACE, H5E_BADRANGE, FAIL, "point selection of one element has more than one node!") /* Calculate offset of selection in projected buffer */ - *offset = H5VM_array_offset(space->extent.rank, space->extent.size, node->pnt); + *offset = H5VM_array_offset(space->extent.rank, space->extent.size, node->pnt); done: FUNC_LEAVE_NOAPI(ret_value) @@ -1487,7 +1487,7 @@ H5S_point_project_simple(const H5S_t *base_space, H5S_t *new_space, hsize_t *off /* Calculate offset of selection in projected buffer */ HDmemset(block, 0, sizeof(block)); HDmemcpy(block, base_space->select.sel_info.pnt_lst->head->pnt, sizeof(hsize_t) * rank_diff); - *offset = H5VM_array_offset(base_space->extent.rank, base_space->extent.size, block); + *offset = H5VM_array_offset(base_space->extent.rank, base_space->extent.size, block); /* Iterate through base space's point nodes, copying the point information */ base_node = base_space->select.sel_info.pnt_lst->head; @@ -1659,7 +1659,7 @@ done: EXAMPLES REVISION LOG --------------------------------------------------------------------------*/ -static herr_t +herr_t H5S_point_get_seq_list(const H5S_t *space, unsigned flags, H5S_sel_iter_t *iter, size_t maxseq, size_t maxelem, size_t *nseq, size_t *nelem, hsize_t *off, size_t *len) @@ -1764,4 +1764,3 @@ H5S_point_get_seq_list(const H5S_t *space, unsigned flags, H5S_sel_iter_t *iter, done: FUNC_LEAVE_NOAPI(ret_value) } /* end H5S_point_get_seq_list() */ - diff --git a/src/H5Sprivate.h b/src/H5Sprivate.h index 32ac51a..899a2c9 100644 --- a/src/H5Sprivate.h +++ b/src/H5Sprivate.h @@ -314,10 +314,35 @@ H5_DLL herr_t H5S_mpio_space_type(const H5S_t *space, size_t elmt_size, /* out: */ MPI_Datatype *new_type, int *count, hbool_t *is_derived_type, - hbool_t do_permute, + hbool_t do_permute, hsize_t **permute_map, hbool_t * is_permuted); + +/* + * Buffer-flattening struct for derived MPI_Types. + * All values are in bytes. + */ +typedef struct H5S_flatbuf_t { + hsize_t count; /* number of contiguous blocks */ + size_t *blocklens; /* array of contiguous block lengths (bytes)*/ + hsize_t *indices; /*array of byte offsets of each block */ + hsize_t extent; /* offset range for one instance of this flatbuf */ + hsize_t size; /* number of bytes of block data */ +} H5S_flatbuf_t; +H5_DLL herr_t H5S__hyper_get_seq_list(const H5S_t *space, unsigned H5_ATTR_UNUSED flags, H5S_sel_iter_t *iter, + size_t maxseq, size_t maxelem, size_t *nseq, size_t *nelem, + hsize_t *off, size_t *len); +H5_DLL herr_t H5S__all_get_seq_list(const H5S_t *space, unsigned flags, + H5S_sel_iter_t *iter, size_t maxseq, size_t maxbytes, + size_t *nseq, size_t *nbytes, hsize_t *off, size_t *len); +H5_DLL herr_t H5S_point_get_seq_list(const H5S_t *space, unsigned flags, H5S_sel_iter_t *iter, + size_t maxseq, size_t maxelem, size_t *nseq, size_t *nelem, + hsize_t *off, size_t *len); + +H5_DLL herr_t H5S_mpio_return_space_rank_and_extent(const H5S_t *space, unsigned *rank, hsize_t *extent); + +H5_DLL herr_t H5S_mpio_return_space_extent_and_select_type(const H5S_t *space, hbool_t *is_permuted, hbool_t *is_regular, H5S_class_t *space_extent_type, H5S_sel_type *space_sel_type); + #endif /* H5_HAVE_PARALLEL */ #endif /* _H5Sprivate_H */ - -- cgit v0.12