From ed1979339aac623c7e8049603453c514777816b3 Mon Sep 17 00:00:00 2001 From: "Richard.Warren" Date: Mon, 27 Sep 2021 20:09:46 -0500 Subject: Various fixes which allow the IOR benchmark to run correctly --- src/H5FDioc.c | 1491 +++++++++++++------------- src/H5FDioc.h | 95 +- src/H5FDioc_threads.c | 382 +++---- src/H5FDsubfile_int.c | 552 +++++----- src/H5FDsubfile_mpi.c | 2618 +++++++++++++++++++++++----------------------- src/H5FDsubfiling.c | 134 ++- src/H5FDsubfiling.h | 93 +- src/H5FDsubfiling_priv.h | 172 +-- 8 files changed, 2819 insertions(+), 2718 deletions(-) diff --git a/src/H5FDioc.c b/src/H5FDioc.c index 5a59c20..b514448 100644 --- a/src/H5FDioc.c +++ b/src/H5FDioc.c @@ -19,6 +19,7 @@ /* This source code file is part of the H5FD driver module */ #include "H5FDdrvr_module.h" +#include "H5FDpublic.h" /* Basic H5FD definitions */ #include "H5Eprivate.h" /* Error handling */ #include "H5FDioc.h" /* IOC file driver */ #include "H5FDprivate.h" /* File drivers */ @@ -32,51 +33,9 @@ #include "H5FDsubfiling_priv.h" /* The driver identification number, initialized at runtime */ -static hid_t H5FD_IOC_g = 0; +static hid_t H5FD_IOC_g = 0; extern volatile int sf_shutdown_flag; -/* The information of this ioc */ -typedef struct H5FD_ioc_t { - H5FD_t pub; /* public stuff, must be first */ - int fd; /* the filesystem file descriptor */ - - H5FD_ioc_config_t fa; /* driver-specific file access properties */ - int mpi_rank; - int mpi_size; - H5FD_t *ioc_file; /* native HDF5 file pointer (sec2) */ - -#ifndef H5_HAVE_WIN32_API - /* On most systems the combination of device and i-node number uniquely - * identify a file. Note that Cygwin, MinGW and other Windows POSIX - * environments have the stat function (which fakes inodes) - * and will use the 'device + inodes' scheme as opposed to the - * Windows code further below. - */ - dev_t device; /* file device number */ - ino_t inode; /* file i-node number */ -#else - /* Files in windows are uniquely identified by the volume serial - * number and the file index (both low and high parts). - * - * There are caveats where these numbers can change, especially - * on FAT file systems. On NTFS, however, a file should keep - * those numbers the same until renamed or deleted (though you - * can use ReplaceFile() on NTFS to keep the numbers the same - * while renaming). - * - * See the MSDN "BY_HANDLE_FILE_INFORMATION Structure" entry for - * more information. - * - * http://msdn.microsoft.com/en-us/library/aa363788(v=VS.85).aspx - */ - DWORD nFileIndexLow; - DWORD nFileIndexHigh; - DWORD dwVolumeSerialNumber; - - HANDLE hFile; /* Native windows file handle */ -#endif /* H5_HAVE_WIN32_API */ -} H5FD_ioc_t; - /* * These macros check for overflow of various quantities. These macros * assume that HDoff_t is signed and haddr_t and size_t are unsigned. @@ -92,82 +51,66 @@ typedef struct H5FD_ioc_t { * which can be addressed entirely by the second * argument of the file seek function. */ -#define MAXADDR (((haddr_t)1 << (8 * sizeof(HDoff_t) - 1)) - 1) +#define MAXADDR (((haddr_t)1 << (8 * sizeof(HDoff_t) - 1)) - 1) #define ADDR_OVERFLOW(A) (HADDR_UNDEF == (A) || ((A) & ~(haddr_t)MAXADDR)) #define SIZE_OVERFLOW(Z) ((Z) & ~(hsize_t)MAXADDR) -#define REGION_OVERFLOW(A, Z) \ - (ADDR_OVERFLOW(A) || SIZE_OVERFLOW(Z) || HADDR_UNDEF == (A) + (Z) || \ - (HDoff_t)((A) + (Z)) < (HDoff_t)(A)) +#define REGION_OVERFLOW(A, Z) \ + (ADDR_OVERFLOW(A) || SIZE_OVERFLOW(Z) || HADDR_UNDEF == (A) + (Z) || (HDoff_t)((A) + (Z)) < (HDoff_t)(A)) #define H5FD_IOC_DEBUG_OP_CALLS 0 /* debugging print toggle; 0 disables */ #if H5FD_IOC_DEBUG_OP_CALLS -#define H5FD_IOC_LOG_CALL(name) \ - do { \ - HDprintf("called %s()\n", (name)); \ - HDfflush(stdout); \ - } while (0) +#define H5FD_IOC_LOG_CALL(name) \ + do { \ + HDprintf("called %s()\n", (name)); \ + HDfflush(stdout); \ + } while (0) #else #define H5FD_IOC_LOG_CALL(name) /* no-op */ #endif /* H5FD_IOC_DEBUG_OP_CALLS */ /* Public functions which are referenced but not found in this file */ -extern herr_t H5FD__write_vector_internal(hid_t h5_fid, hssize_t count, - haddr_t addrs[], size_t sizes[], +extern herr_t H5FD__write_vector_internal(hid_t h5_fid, hssize_t count, haddr_t addrs[], size_t sizes[], const void *bufs[] /* data_in */); -extern herr_t H5FD__read_vector_internal(hid_t h5_fid, hssize_t count, - haddr_t addrs[], size_t sizes[], +extern herr_t H5FD__read_vector_internal(hid_t h5_fid, hssize_t count, haddr_t addrs[], size_t sizes[], void *bufs[] /* data_out */); -extern int H5FD__close_subfiles(int64_t context_id); -extern int H5FD__open_subfiles(void *_config_info, uint64_t h5_file_id, int flags); -extern hid_t fid_map_to_context(hid_t sf_fid); +extern int H5FD__close_subfiles(int64_t context_id); +extern int H5FD__open_subfiles(void *_config_info, uint64_t h5_file_id, int flags); +extern hid_t fid_map_to_context(hid_t sf_fid); extern subfiling_context_t *get__subfiling_object(int64_t context_id); /* Private functions */ /* Prototypes */ -static herr_t H5FD__ioc_term(void); +static herr_t H5FD__ioc_term(void); static hsize_t H5FD__ioc_sb_size(H5FD_t *_file); -static herr_t H5FD__ioc_sb_encode(H5FD_t *_file, char *name /*out*/, - unsigned char *buf /*out*/); -static herr_t H5FD__ioc_sb_decode(H5FD_t *_file, const char *name, - const unsigned char *buf); -static void *H5FD__ioc_fapl_get(H5FD_t *_file); -static void *H5FD__ioc_fapl_copy(const void *_old_fa); -static herr_t H5FD__ioc_fapl_free(void *_fapl); -static H5FD_t *H5FD__ioc_open(const char *name, unsigned flags, hid_t fapl_id, - haddr_t maxaddr); -static herr_t H5FD__ioc_close(H5FD_t *_file); -static int H5FD__ioc_cmp(const H5FD_t *_f1, const H5FD_t *_f2); -static herr_t H5FD__ioc_query(const H5FD_t *_file, - unsigned long *flags /* out */); -static herr_t H5FD__ioc_get_type_map(const H5FD_t *_file, H5FD_mem_t *type_map); -static haddr_t H5FD__ioc_alloc(H5FD_t *file, H5FD_mem_t type, hid_t dxpl_id, - hsize_t size); -static herr_t H5FD__ioc_free(H5FD_t *_file, H5FD_mem_t type, hid_t dxpl_id, - haddr_t addr, hsize_t size); -static haddr_t H5FD__ioc_get_eoa(const H5FD_t *_file, - H5FD_mem_t H5_ATTR_UNUSED type); -static herr_t H5FD__ioc_set_eoa(H5FD_t *_file, H5FD_mem_t H5_ATTR_UNUSED type, - haddr_t addr); -static haddr_t H5FD__ioc_get_eof(const H5FD_t *_file, - H5FD_mem_t H5_ATTR_UNUSED type); -static herr_t H5FD__ioc_get_handle(H5FD_t *_file, hid_t H5_ATTR_UNUSED fapl, - void **file_handle); -static herr_t H5FD__ioc_read(H5FD_t *_file, H5FD_mem_t type, hid_t dxpl_id, - haddr_t addr, size_t size, void *buf); -static herr_t H5FD__ioc_write(H5FD_t *_file, H5FD_mem_t type, hid_t dxpl_id, - haddr_t addr, size_t size, const void *buf); -static herr_t H5FD__ioc_read_vector(H5FD_t *file, hid_t dxpl_id, uint32_t count, - H5FD_mem_t types[], haddr_t addrs[], - size_t sizes[], void *bufs[] /* out */); -static herr_t H5FD__ioc_write_vector(H5FD_t *file, hid_t dxpl_id, - uint32_t count, H5FD_mem_t types[], - haddr_t addrs[], size_t sizes[], - const void *bufs[] /* in */); -static herr_t H5FD__ioc_flush(H5FD_t *_file, hid_t dxpl_id, hbool_t closing); -static herr_t H5FD__ioc_truncate(H5FD_t *_file, hid_t dxpl_id, hbool_t closing); -static herr_t H5FD__ioc_lock(H5FD_t *_file, hbool_t rw); -static herr_t H5FD__ioc_unlock(H5FD_t *_file); +static herr_t H5FD__ioc_sb_encode(H5FD_t *_file, char *name /*out*/, unsigned char *buf /*out*/); +static herr_t H5FD__ioc_sb_decode(H5FD_t *_file, const char *name, const unsigned char *buf); +static void * H5FD__ioc_fapl_get(H5FD_t *_file); +static void * H5FD__ioc_fapl_copy(const void *_old_fa); +static herr_t H5FD__ioc_fapl_free(void *_fapl); +static H5FD_t *H5FD__ioc_open(const char *name, unsigned flags, hid_t fapl_id, haddr_t maxaddr); +static herr_t H5FD__ioc_close(H5FD_t *_file); +static int H5FD__ioc_cmp(const H5FD_t *_f1, const H5FD_t *_f2); +static herr_t H5FD__ioc_query(const H5FD_t *_file, unsigned long *flags /* out */); +static herr_t H5FD__ioc_get_type_map(const H5FD_t *_file, H5FD_mem_t *type_map); +static haddr_t H5FD__ioc_alloc(H5FD_t *file, H5FD_mem_t type, hid_t dxpl_id, hsize_t size); +static herr_t H5FD__ioc_free(H5FD_t *_file, H5FD_mem_t type, hid_t dxpl_id, haddr_t addr, hsize_t size); +static haddr_t H5FD__ioc_get_eoa(const H5FD_t *_file, H5FD_mem_t H5_ATTR_UNUSED type); +static herr_t H5FD__ioc_set_eoa(H5FD_t *_file, H5FD_mem_t H5_ATTR_UNUSED type, haddr_t addr); +static haddr_t H5FD__ioc_get_eof(const H5FD_t *_file, H5FD_mem_t H5_ATTR_UNUSED type); +static herr_t H5FD__ioc_get_handle(H5FD_t *_file, hid_t H5_ATTR_UNUSED fapl, void **file_handle); +static herr_t H5FD__ioc_read(H5FD_t *_file, H5FD_mem_t type, hid_t dxpl_id, haddr_t addr, size_t size, + void *buf); +static herr_t H5FD__ioc_write(H5FD_t *_file, H5FD_mem_t type, hid_t dxpl_id, haddr_t addr, size_t size, + const void *buf); +static herr_t H5FD__ioc_read_vector(H5FD_t *file, hid_t dxpl_id, uint32_t count, H5FD_mem_t types[], + haddr_t addrs[], size_t sizes[], void *bufs[] /* out */); +static herr_t H5FD__ioc_write_vector(H5FD_t *file, hid_t dxpl_id, uint32_t count, H5FD_mem_t types[], + haddr_t addrs[], size_t sizes[], const void *bufs[] /* in */); +static herr_t H5FD__ioc_flush(H5FD_t *_file, hid_t dxpl_id, hbool_t closing); +static herr_t H5FD__ioc_truncate(H5FD_t *_file, hid_t dxpl_id, hbool_t closing); +static herr_t H5FD__ioc_lock(H5FD_t *_file, hbool_t rw); +static herr_t H5FD__ioc_unlock(H5FD_t *_file); /* static herr_t H5FD__ioc_ctl(H5FD_t *file, uint64_t op_code, uint64_t flags, const void *input, void **result); @@ -227,30 +170,33 @@ H5FL_DEFINE_STATIC(H5FD_ioc_config_t); * Return: SUCCEED/FAIL *------------------------------------------------------------------------- */ -static herr_t H5FD__init_package(void) { - herr_t ret_value = SUCCEED; - FUNC_ENTER_NOAPI(FAIL) +static herr_t +H5FD__init_package(void) +{ + herr_t ret_value = SUCCEED; + FUNC_ENTER_NOAPI(FAIL) - H5FD_IOC_LOG_CALL(FUNC); + H5FD_IOC_LOG_CALL(FUNC); #if 1 /* JRM */ - if (H5I_VFL != H5I_get_type(H5FD_IOC_g)) - H5FD_IOC_g = H5FD_register(&H5FD_ioc_g, sizeof(H5FD_class_t), FALSE); + if (H5I_VFL != H5I_get_type(H5FD_IOC_g)) + H5FD_IOC_g = H5FD_register(&H5FD_ioc_g, sizeof(H5FD_class_t), FALSE); #else /* JRM */ - if (H5I_VFL != H5I_get_type(H5FD_IOC_g)) { - HDfprintf(stdout, "H5FD_ioc_init(): calling H5FD_register()\n"); - H5FD_IOC_g = H5FD_register(&H5FD_ioc_g, sizeof(H5FD_class_t), FALSE); - } + if (H5I_VFL != H5I_get_type(H5FD_IOC_g)) { + HDfprintf(stdout, "H5FD_ioc_init(): calling H5FD_register()\n"); + H5FD_IOC_g = H5FD_register(&H5FD_ioc_g, sizeof(H5FD_class_t), FALSE); + } #endif /* JRM */ #if 0 /* JRM */ HDfprintf(stdout, "H5FD_ioc_init() IOC registerd. id = %lld \n", (int64_t)H5FD_IOC_g); #endif /* JRM */ - ret_value = H5FD_IOC_g; + if (H5I_INVALID_HID == H5FD_IOC_g) + HGOTO_ERROR(H5E_ID, H5E_CANTREGISTER, H5I_INVALID_HID, "unable to register file driver ID") done: - FUNC_LEAVE_NOAPI(ret_value) + FUNC_LEAVE_NOAPI(ret_value) } /* H5FD__init_package() */ /*------------------------------------------------------------------------- @@ -263,20 +209,22 @@ done: * Failure: Negative *------------------------------------------------------------------------- */ -hid_t H5FD_ioc_init(void) { - hid_t ret_value = H5I_INVALID_HID; +hid_t +H5FD_ioc_init(void) +{ + hid_t ret_value = H5I_INVALID_HID; - FUNC_ENTER_NOAPI(FAIL) + FUNC_ENTER_NOAPI(FAIL) - H5FD_IOC_LOG_CALL(FUNC); + H5FD_IOC_LOG_CALL(FUNC); - if (H5I_VFL != H5I_get_type(H5FD_IOC_g)) - H5FD_IOC_g = H5FDregister(&H5FD_ioc_g); + if (H5I_VFL != H5I_get_type(H5FD_IOC_g)) + H5FD_IOC_g = H5FDregister(&H5FD_ioc_g); - ret_value = H5FD_IOC_g; + ret_value = H5FD_IOC_g; done: - FUNC_LEAVE_NOAPI(ret_value) + FUNC_LEAVE_NOAPI(ret_value) } /* end H5FD_ioc_init() */ /*------------------------------------------------------------------------- @@ -290,11 +238,13 @@ done: * *------------------------------------------------------------------------- */ -void H5FD_ioc_set_shutdown_flag(int flag) { - sf_shutdown_flag = flag; - if (H5FD_IOC_g > 0) - usleep(100); - return; +void +H5FD_ioc_set_shutdown_flag(int flag) +{ + sf_shutdown_flag = flag; + if (H5FD_IOC_g > 0) + usleep(100); + return; } /* end H5FD_ioc_set_shutdown_flag() */ /*--------------------------------------------------------------------------- @@ -305,7 +255,8 @@ void H5FD_ioc_set_shutdown_flag(int flag) { * Returns: SUCCEED (Can't fail) *--------------------------------------------------------------------------- */ -static herr_t H5FD__ioc_term(void) +static herr_t +H5FD__ioc_term(void) { FUNC_ENTER_NOAPI_NOINIT_NOERR // FUNC_ENTER_STATIC_NOERR @@ -335,30 +286,31 @@ static herr_t H5FD__ioc_term(void) * Return: 0 on success, -1 on error. *------------------------------------------------------------------------- */ -static int H5FD__copy_plist(hid_t fapl_id, hid_t *id_out_ptr) { - int ret_value = 0; - H5P_genplist_t *plist_ptr = NULL; +static int +H5FD__copy_plist(hid_t fapl_id, hid_t *id_out_ptr) +{ + int ret_value = 0; + H5P_genplist_t *plist_ptr = NULL; - FUNC_ENTER_STATIC + FUNC_ENTER_STATIC - H5FD_IOC_LOG_CALL(FUNC); + H5FD_IOC_LOG_CALL(FUNC); - HDassert(id_out_ptr != NULL); + HDassert(id_out_ptr != NULL); - if (FALSE == H5P_isa_class(fapl_id, H5P_FILE_ACCESS)) - HGOTO_ERROR(H5E_ARGS, H5E_BADTYPE, -1, "not a file access property list"); + if (FALSE == H5P_isa_class(fapl_id, H5P_FILE_ACCESS)) + HGOTO_ERROR(H5E_ARGS, H5E_BADTYPE, -1, "not a file access property list"); - plist_ptr = (H5P_genplist_t *)H5I_object(fapl_id); - if (NULL == plist_ptr) - HGOTO_ERROR(H5E_ARGS, H5E_BADTYPE, -1, "unable to get property list"); + plist_ptr = (H5P_genplist_t *)H5I_object(fapl_id); + if (NULL == plist_ptr) + HGOTO_ERROR(H5E_ARGS, H5E_BADTYPE, -1, "unable to get property list"); - *id_out_ptr = H5P_copy_plist(plist_ptr, FALSE); - if (H5I_INVALID_HID == *id_out_ptr) - HGOTO_ERROR(H5E_VFL, H5E_BADTYPE, -1, - "unable to copy file access property list"); + *id_out_ptr = H5P_copy_plist(plist_ptr, FALSE); + if (H5I_INVALID_HID == *id_out_ptr) + HGOTO_ERROR(H5E_VFL, H5E_BADTYPE, -1, "unable to copy file access property list"); done: - FUNC_LEAVE_NOAPI(ret_value); + FUNC_LEAVE_NOAPI(ret_value); } /* end H5FD__copy_plist() */ /*------------------------------------------------------------------------- @@ -370,39 +322,38 @@ done: * Return: SUCCEED/FAIL *------------------------------------------------------------------------- */ -herr_t H5Pset_fapl_ioc(hid_t fapl_id, H5FD_ioc_config_t *vfd_config) { - H5FD_ioc_config_t *info = NULL; - H5P_genplist_t *plist_ptr = NULL; - herr_t ret_value = SUCCEED; - - FUNC_ENTER_API(FAIL) - H5TRACE2("e", "i*!", fapl_id, vfd_config); - - H5FD_IOC_LOG_CALL(FUNC); - - if (H5FD_IOC_FAPL_T_MAGIC != vfd_config->common.magic) - HGOTO_ERROR(H5E_ARGS, H5E_BADVALUE, FAIL, - "invalid configuration (magic number mismatch)") - if (H5FD_CURR_IOC_FAPL_T_VERSION != vfd_config->common.version) - HGOTO_ERROR(H5E_ARGS, H5E_BADVALUE, FAIL, - "invalid config (version number mismatch)") - if (NULL == (plist_ptr = (H5P_genplist_t *)H5I_object(fapl_id))) - HGOTO_ERROR(H5E_ARGS, H5E_BADTYPE, FAIL, "not a valid property list") - - info = H5FL_CALLOC(H5FD_ioc_config_t); - if (NULL == info) - HGOTO_ERROR(H5E_VFL, H5E_CANTALLOC, FAIL, - "unable to allocate file access property list struct") - - memcpy(info, vfd_config, sizeof(H5FD_ioc_config_t)); - info->common.ioc_fapl_id = fapl_id; - ret_value = H5P_set_driver(plist_ptr, H5FD_IOC, info); +herr_t +H5Pset_fapl_ioc(hid_t fapl_id, H5FD_ioc_config_t *vfd_config) +{ + H5FD_ioc_config_t *info = NULL; + H5P_genplist_t * plist_ptr = NULL; + herr_t ret_value = SUCCEED; + + FUNC_ENTER_API(FAIL) + H5TRACE2("e", "i*!", fapl_id, vfd_config); + + H5FD_IOC_LOG_CALL(FUNC); + + if (H5FD_IOC_FAPL_T_MAGIC != vfd_config->common.magic) + HGOTO_ERROR(H5E_ARGS, H5E_BADVALUE, FAIL, "invalid configuration (magic number mismatch)") + if (H5FD_CURR_IOC_FAPL_T_VERSION != vfd_config->common.version) + HGOTO_ERROR(H5E_ARGS, H5E_BADVALUE, FAIL, "invalid config (version number mismatch)") + if (NULL == (plist_ptr = (H5P_genplist_t *)H5I_object(fapl_id))) + HGOTO_ERROR(H5E_ARGS, H5E_BADTYPE, FAIL, "not a valid property list") + + info = H5FL_CALLOC(H5FD_ioc_config_t); + if (NULL == info) + HGOTO_ERROR(H5E_VFL, H5E_CANTALLOC, FAIL, "unable to allocate file access property list struct") + + memcpy(info, vfd_config, sizeof(H5FD_ioc_config_t)); + info->common.ioc_fapl_id = fapl_id; + ret_value = H5P_set_driver(plist_ptr, H5FD_IOC, info); done: - if (info) - info = H5FL_FREE(H5FD_ioc_config_t, info); + if (info) + info = H5FL_FREE(H5FD_ioc_config_t, info); - FUNC_LEAVE_API(ret_value) + FUNC_LEAVE_API(ret_value) } /* end H5Pset_fapl_ioc() */ /*------------------------------------------------------------------------- @@ -416,19 +367,21 @@ done: * Return: SUCCEED/FAIL *------------------------------------------------------------------------- */ -static herr_t fapl_get_ioc_defaults(H5FD_ioc_config_t *fa) { - herr_t ret_value = SUCCEED; - - fa->common.magic = H5FD_IOC_FAPL_T_MAGIC; - fa->common.version = H5FD_CURR_IOC_FAPL_T_VERSION; - fa->common.ioc_fapl_id = H5P_DEFAULT; - fa->common.stripe_count = 0; - fa->common.stripe_depth = H5FD_DEFAULT_STRIPE_DEPTH; - fa->common.ioc_selection = SELECT_IOC_ONE_PER_NODE; - - /* Specific to this IO Concentrator */ - fa->thread_pool_count = H5FD_IOC_THREAD_POOL_SIZE; - return (ret_value); +static herr_t +fapl_get_ioc_defaults(H5FD_ioc_config_t *fa) +{ + herr_t ret_value = SUCCEED; + + fa->common.magic = H5FD_IOC_FAPL_T_MAGIC; + fa->common.version = H5FD_CURR_IOC_FAPL_T_VERSION; + fa->common.ioc_fapl_id = H5P_DEFAULT; + fa->common.stripe_count = 0; + fa->common.stripe_depth = H5FD_DEFAULT_STRIPE_DEPTH; + fa->common.ioc_selection = SELECT_IOC_ONE_PER_NODE; + + /* Specific to this IO Concentrator */ + fa->thread_pool_count = H5FD_IOC_THREAD_POOL_SIZE; + return (ret_value); } /* end fapl_get_ioc_defaults() */ /*------------------------------------------------------------------------- @@ -443,41 +396,43 @@ static herr_t fapl_get_ioc_defaults(H5FD_ioc_config_t *fa) { * Return: SUCCEED/FAIL *------------------------------------------------------------------------- */ -herr_t H5Pget_fapl_ioc(hid_t fapl_id, H5FD_ioc_config_t *config_out) { - const H5FD_ioc_config_t *config_ptr = NULL; - H5P_genplist_t *plist_ptr = NULL; - herr_t ret_value = SUCCEED; - - FUNC_ENTER_API(FAIL) - H5TRACE2("e", "i*!", fapl_id, config_out); - - H5FD_IOC_LOG_CALL(FUNC); - - /* Check arguments */ - if (config_out == NULL) - HGOTO_ERROR(H5E_ARGS, H5E_BADVALUE, FAIL, "config_out is NULL") - - plist_ptr = H5P_object_verify(fapl_id, H5P_FILE_ACCESS); - if (plist_ptr == NULL) { - HGOTO_ERROR(H5E_ARGS, H5E_BADTYPE, FAIL, "not a file access list") - } - - config_ptr = (const H5FD_ioc_config_t *)H5P_peek_driver_info(plist_ptr); - if (config_ptr == NULL) { - memset(config_out, 0, sizeof(H5FD_ioc_config_t)); - ret_value = fapl_get_ioc_defaults(config_out); - } else { - /* Copy the subfiling fapl data out */ - HDmemcpy(config_out, config_ptr, sizeof(H5FD_ioc_config_t)); - - /* Copy the driver info value */ - if (H5FD__copy_plist(config_ptr->common.ioc_fapl_id, - &(config_out->common.ioc_fapl_id)) < 0) - HGOTO_ERROR(H5E_VFL, H5E_BADVALUE, FAIL, "can't copy IOC FAPL"); - } +herr_t +H5Pget_fapl_ioc(hid_t fapl_id, H5FD_ioc_config_t *config_out) +{ + const H5FD_ioc_config_t *config_ptr = NULL; + H5P_genplist_t * plist_ptr = NULL; + herr_t ret_value = SUCCEED; + + FUNC_ENTER_API(FAIL) + H5TRACE2("e", "i*!", fapl_id, config_out); + + H5FD_IOC_LOG_CALL(FUNC); + + /* Check arguments */ + if (config_out == NULL) + HGOTO_ERROR(H5E_ARGS, H5E_BADVALUE, FAIL, "config_out is NULL") + + plist_ptr = H5P_object_verify(fapl_id, H5P_FILE_ACCESS); + if (plist_ptr == NULL) { + HGOTO_ERROR(H5E_ARGS, H5E_BADTYPE, FAIL, "not a file access list") + } + + config_ptr = (const H5FD_ioc_config_t *)H5P_peek_driver_info(plist_ptr); + if (config_ptr == NULL) { + memset(config_out, 0, sizeof(H5FD_ioc_config_t)); + ret_value = fapl_get_ioc_defaults(config_out); + } + else { + /* Copy the subfiling fapl data out */ + HDmemcpy(config_out, config_ptr, sizeof(H5FD_ioc_config_t)); + + /* Copy the driver info value */ + if (H5FD__copy_plist(config_ptr->common.ioc_fapl_id, &(config_out->common.ioc_fapl_id)) < 0) + HGOTO_ERROR(H5E_VFL, H5E_BADVALUE, FAIL, "can't copy IOC FAPL"); + } done: - FUNC_LEAVE_API(ret_value) + FUNC_LEAVE_API(ret_value) } /* end H5Pget_fapl_ioc() */ /*------------------------------------------------------------------------- @@ -488,21 +443,22 @@ done: * Return: SUCCEED/FAIL *------------------------------------------------------------------------- */ -static herr_t H5FD__ioc_flush(H5FD_t *_file, hid_t H5_ATTR_UNUSED dxpl_id, - hbool_t closing) { - H5FD_ioc_t *file = (H5FD_ioc_t *)_file; - herr_t ret_value = SUCCEED; /* Return value */ +static herr_t +H5FD__ioc_flush(H5FD_t *_file, hid_t H5_ATTR_UNUSED dxpl_id, hbool_t closing) +{ + H5FD_ioc_t *file = (H5FD_ioc_t *)_file; + herr_t ret_value = SUCCEED; /* Return value */ - FUNC_ENTER_STATIC + FUNC_ENTER_STATIC - H5FD_IOC_LOG_CALL(FUNC); + H5FD_IOC_LOG_CALL(FUNC); - /* Public API for dxpl "context" */ - if (H5FDflush(file->ioc_file, dxpl_id, closing) < 0) - HGOTO_ERROR(H5E_VFL, H5E_CANTFLUSH, FAIL, "unable to flush R/W file") + /* Public API for dxpl "context" */ + if (H5FDflush(file->ioc_file, dxpl_id, closing) < 0) + HGOTO_ERROR(H5E_VFL, H5E_CANTFLUSH, FAIL, "unable to flush R/W file") done: - FUNC_LEAVE_NOAPI(ret_value) + FUNC_LEAVE_NOAPI(ret_value) } /* end H5FD__ioc_flush() */ /*------------------------------------------------------------------------- @@ -519,33 +475,32 @@ done: * The contents of BUF are undefined. *------------------------------------------------------------------------- */ -static herr_t H5FD__ioc_read(H5FD_t *_file, H5FD_mem_t H5_ATTR_UNUSED type, - hid_t H5_ATTR_UNUSED dxpl_id, haddr_t addr, - size_t size, void *buf) { - H5FD_ioc_t *file = (H5FD_ioc_t *)_file; - herr_t ret_value = SUCCEED; +static herr_t +H5FD__ioc_read(H5FD_t *_file, H5FD_mem_t H5_ATTR_UNUSED type, hid_t H5_ATTR_UNUSED dxpl_id, haddr_t addr, + size_t size, void *buf) +{ + H5FD_ioc_t *file = (H5FD_ioc_t *)_file; + herr_t ret_value = SUCCEED; - FUNC_ENTER_STATIC + FUNC_ENTER_STATIC - H5FD_IOC_LOG_CALL(FUNC); + H5FD_IOC_LOG_CALL(FUNC); - HDassert(file && file->pub.cls); - HDassert(buf); + HDassert(file && file->pub.cls); + HDassert(buf); - /* Check for overflow conditions */ - if (!H5F_addr_defined(addr)) - HGOTO_ERROR(H5E_ARGS, H5E_BADVALUE, FAIL, "addr undefined, addr = %llu", - (unsigned long long)addr) - if (REGION_OVERFLOW(addr, size)) - HGOTO_ERROR(H5E_ARGS, H5E_OVERFLOW, FAIL, "addr overflow, addr = %llu", - (unsigned long long)addr) + /* Check for overflow conditions */ + if (!H5F_addr_defined(addr)) + HGOTO_ERROR(H5E_ARGS, H5E_BADVALUE, FAIL, "addr undefined, addr = %llu", (unsigned long long)addr) + if (REGION_OVERFLOW(addr, size)) + HGOTO_ERROR(H5E_ARGS, H5E_OVERFLOW, FAIL, "addr overflow, addr = %llu", (unsigned long long)addr) - /* Public API for dxpl "context" */ - if (H5FDread(file->ioc_file, type, dxpl_id, addr, size, buf) < 0) - HGOTO_ERROR(H5E_VFL, H5E_READERROR, FAIL, "Reading from R/W channel failed") + /* Public API for dxpl "context" */ + if (H5FDread(file->ioc_file, type, dxpl_id, addr, size, buf) < 0) + HGOTO_ERROR(H5E_VFL, H5E_READERROR, FAIL, "Reading from R/W channel failed") done: - FUNC_LEAVE_NOAPI(ret_value) + FUNC_LEAVE_NOAPI(ret_value) } /* end H5FD__ioc_read() */ /*------------------------------------------------------------------------- @@ -558,118 +513,112 @@ done: * Return: SUCCEED/FAIL *------------------------------------------------------------------------- */ -static herr_t H5FD__ioc_write(H5FD_t *_file, H5FD_mem_t H5_ATTR_UNUSED type, - hid_t dxpl_id, haddr_t addr, size_t size, const void *buf) { - H5FD_ioc_t *file_ptr = (H5FD_ioc_t *)_file; - H5P_genplist_t *plist_ptr = NULL; - herr_t ret_value = SUCCEED; - hid_t h5_fid; +static herr_t +H5FD__ioc_write(H5FD_t *_file, H5FD_mem_t H5_ATTR_UNUSED type, hid_t dxpl_id, haddr_t addr, size_t size, + const void *buf) +{ + H5FD_ioc_t * file_ptr = (H5FD_ioc_t *)_file; + H5P_genplist_t *plist_ptr = NULL; + herr_t ret_value = SUCCEED; + hid_t h5_fid; - FUNC_ENTER_STATIC + FUNC_ENTER_STATIC - if (NULL == (plist_ptr = (H5P_genplist_t *)H5I_object(dxpl_id))) - HGOTO_ERROR(H5E_ARGS, H5E_BADTYPE, FAIL, "not a property list") + if (NULL == (plist_ptr = (H5P_genplist_t *)H5I_object(dxpl_id))) + HGOTO_ERROR(H5E_ARGS, H5E_BADTYPE, FAIL, "not a property list") - addr += _file->base_addr; - h5_fid = (hid_t)file_ptr->inode; - ret_value = H5FD__write_vector_internal(h5_fid, 1, &addr, &size, &buf); + addr += _file->base_addr; + h5_fid = (hid_t)file_ptr->inode; + ret_value = H5FD__write_vector_internal(h5_fid, 1, &addr, &size, &buf); done: - FUNC_LEAVE_NOAPI(ret_value) + FUNC_LEAVE_NOAPI(ret_value) } /* end H5FD__ioc_write() */ -static herr_t H5FD__ioc_read_vector(H5FD_t *_file, hid_t dxpl_id, - uint32_t count, H5FD_mem_t types[], - haddr_t addrs[], size_t sizes[], - void *bufs[] /* out */) { - H5FD_ioc_t *file_ptr = (H5FD_ioc_t *)_file; - herr_t ret_value = SUCCEED; /* Return value */ - hid_t h5_fid; - - FUNC_ENTER_STATIC - - /* Check arguments */ - if (!file_ptr) - HGOTO_ERROR(H5E_ARGS, H5E_BADVALUE, FAIL, "file pointer cannot be NULL") - - if ((!types) && (count > 0)) - HGOTO_ERROR(H5E_ARGS, H5E_BADVALUE, FAIL, - "types parameter can't be NULL if count is positive") - - if ((!addrs) && (count > 0)) - HGOTO_ERROR(H5E_ARGS, H5E_BADVALUE, FAIL, - "addrs parameter can't be NULL if count is positive") - - if ((!sizes) && (count > 0)) - HGOTO_ERROR(H5E_ARGS, H5E_BADVALUE, FAIL, - "sizes parameter can't be NULL if count is positive") - - if ((!bufs) && (count > 0)) - HGOTO_ERROR(H5E_ARGS, H5E_BADVALUE, FAIL, - "bufs parameter can't be NULL if count is positive") - - /* Get the default dataset transfer property list if the user didn't provide - * one */ - if (H5P_DEFAULT == dxpl_id) { - dxpl_id = H5P_DATASET_XFER_DEFAULT; - } else { - if (TRUE != H5P_isa_class(dxpl_id, H5P_DATASET_XFER)) - HGOTO_ERROR(H5E_ARGS, H5E_BADTYPE, FAIL, - "not a data transfer property list") - } - - h5_fid = (hid_t)file_ptr->inode; - ret_value = H5FD__read_vector_internal(h5_fid, count, addrs, sizes, bufs); +static herr_t +H5FD__ioc_read_vector(H5FD_t *_file, hid_t dxpl_id, uint32_t count, H5FD_mem_t types[], haddr_t addrs[], + size_t sizes[], void *bufs[] /* out */) +{ + H5FD_ioc_t *file_ptr = (H5FD_ioc_t *)_file; + herr_t ret_value = SUCCEED; /* Return value */ + hid_t h5_fid; + + FUNC_ENTER_STATIC + + /* Check arguments */ + if (!file_ptr) + HGOTO_ERROR(H5E_ARGS, H5E_BADVALUE, FAIL, "file pointer cannot be NULL") + + if ((!types) && (count > 0)) + HGOTO_ERROR(H5E_ARGS, H5E_BADVALUE, FAIL, "types parameter can't be NULL if count is positive") + + if ((!addrs) && (count > 0)) + HGOTO_ERROR(H5E_ARGS, H5E_BADVALUE, FAIL, "addrs parameter can't be NULL if count is positive") + + if ((!sizes) && (count > 0)) + HGOTO_ERROR(H5E_ARGS, H5E_BADVALUE, FAIL, "sizes parameter can't be NULL if count is positive") + + if ((!bufs) && (count > 0)) + HGOTO_ERROR(H5E_ARGS, H5E_BADVALUE, FAIL, "bufs parameter can't be NULL if count is positive") + + /* Get the default dataset transfer property list if the user didn't provide + * one */ + if (H5P_DEFAULT == dxpl_id) { + dxpl_id = H5P_DATASET_XFER_DEFAULT; + } + else { + if (TRUE != H5P_isa_class(dxpl_id, H5P_DATASET_XFER)) + HGOTO_ERROR(H5E_ARGS, H5E_BADTYPE, FAIL, "not a data transfer property list") + } + + h5_fid = (hid_t)file_ptr->inode; + ret_value = H5FD__read_vector_internal(h5_fid, count, addrs, sizes, bufs); done: - FUNC_LEAVE_NOAPI(ret_value) + FUNC_LEAVE_NOAPI(ret_value) } -static herr_t H5FD__ioc_write_vector(H5FD_t *_file, hid_t dxpl_id, - uint32_t count, H5FD_mem_t types[], - haddr_t addrs[], size_t sizes[], - const void *bufs[] /* in */) { - H5FD_ioc_t *file = (H5FD_ioc_t *)_file; - herr_t ret_value = SUCCEED; /* Return value */ - hid_t h5_fid; - - FUNC_ENTER_STATIC - - /* Check arguments */ - if (!file) - HGOTO_ERROR(H5E_ARGS, H5E_BADVALUE, FAIL, "file pointer cannot be NULL") - - if ((!types) && (count > 0)) - HGOTO_ERROR(H5E_ARGS, H5E_BADVALUE, FAIL, - "types parameter can't be NULL if count is positive") - - if ((!addrs) && (count > 0)) - HGOTO_ERROR(H5E_ARGS, H5E_BADVALUE, FAIL, - "addrs parameter can't be NULL if count is positive") - - if ((!sizes) && (count > 0)) - HGOTO_ERROR(H5E_ARGS, H5E_BADVALUE, FAIL, - "sizes parameter can't be NULL if count is positive") - - if ((!bufs) && (count > 0)) - HGOTO_ERROR(H5E_ARGS, H5E_BADVALUE, FAIL, - "bufs parameter can't be NULL if count is positive") - - /* Get the default dataset transfer property list if the user didn't provide - * one */ - if (H5P_DEFAULT == dxpl_id) { - dxpl_id = H5P_DATASET_XFER_DEFAULT; - } else { - if (TRUE != H5P_isa_class(dxpl_id, H5P_DATASET_XFER)) - HGOTO_ERROR(H5E_ARGS, H5E_BADTYPE, FAIL, - "not a data transfer property list") - } - h5_fid = (hid_t)file->inode; - ret_value = H5FD__write_vector_internal(h5_fid, count, addrs, sizes, bufs); +static herr_t +H5FD__ioc_write_vector(H5FD_t *_file, hid_t dxpl_id, uint32_t count, H5FD_mem_t types[], haddr_t addrs[], + size_t sizes[], const void *bufs[] /* in */) +{ + H5FD_ioc_t *file = (H5FD_ioc_t *)_file; + herr_t ret_value = SUCCEED; /* Return value */ + hid_t h5_fid; + + FUNC_ENTER_STATIC + + /* Check arguments */ + if (!file) + HGOTO_ERROR(H5E_ARGS, H5E_BADVALUE, FAIL, "file pointer cannot be NULL") + + if ((!types) && (count > 0)) + HGOTO_ERROR(H5E_ARGS, H5E_BADVALUE, FAIL, "types parameter can't be NULL if count is positive") + + if ((!addrs) && (count > 0)) + HGOTO_ERROR(H5E_ARGS, H5E_BADVALUE, FAIL, "addrs parameter can't be NULL if count is positive") + + if ((!sizes) && (count > 0)) + HGOTO_ERROR(H5E_ARGS, H5E_BADVALUE, FAIL, "sizes parameter can't be NULL if count is positive") + + if ((!bufs) && (count > 0)) + HGOTO_ERROR(H5E_ARGS, H5E_BADVALUE, FAIL, "bufs parameter can't be NULL if count is positive") + + /* Get the default dataset transfer property list if the user didn't provide + * one */ + if (H5P_DEFAULT == dxpl_id) { + dxpl_id = H5P_DATASET_XFER_DEFAULT; + } + else { + if (TRUE != H5P_isa_class(dxpl_id, H5P_DATASET_XFER)) + HGOTO_ERROR(H5E_ARGS, H5E_BADTYPE, FAIL, "not a data transfer property list") + } + h5_fid = (hid_t)file->inode; + ret_value = H5FD__write_vector_internal(h5_fid, count, addrs, sizes, bufs); done: - FUNC_LEAVE_NOAPI(ret_value) + FUNC_LEAVE_NOAPI(ret_value) } /* end H5FDioc__write_vector() */ /*------------------------------------------------------------------------- @@ -684,17 +633,19 @@ done: * Failure: NULL *------------------------------------------------------------------------- */ -static void *H5FD__ioc_fapl_get(H5FD_t *_file) { - H5FD_ioc_t *file = (H5FD_ioc_t *)_file; - void *ret_value = NULL; +static void * +H5FD__ioc_fapl_get(H5FD_t *_file) +{ + H5FD_ioc_t *file = (H5FD_ioc_t *)_file; + void * ret_value = NULL; - FUNC_ENTER_STATIC_NOERR + FUNC_ENTER_STATIC_NOERR - H5FD_IOC_LOG_CALL(FUNC); + H5FD_IOC_LOG_CALL(FUNC); - ret_value = H5FD__ioc_fapl_copy(&(file->fa)); + ret_value = H5FD__ioc_fapl_copy(&(file->fa)); - FUNC_LEAVE_NOAPI(ret_value) + FUNC_LEAVE_NOAPI(ret_value) } /* end H5FD__ioc_fapl_get() */ /*------------------------------------------------------------------------- @@ -706,39 +657,38 @@ static void *H5FD__ioc_fapl_get(H5FD_t *_file) { * Failure: NULL *------------------------------------------------------------------------- */ -static void *H5FD__ioc_fapl_copy(const void *_old_fa) { - const H5FD_ioc_config_t *old_fa_ptr = (const H5FD_ioc_config_t *)_old_fa; - H5FD_ioc_config_t *new_fa_ptr = NULL; - void *ret_value = NULL; +static void * +H5FD__ioc_fapl_copy(const void *_old_fa) +{ + const H5FD_ioc_config_t *old_fa_ptr = (const H5FD_ioc_config_t *)_old_fa; + H5FD_ioc_config_t * new_fa_ptr = NULL; + void * ret_value = NULL; - FUNC_ENTER_STATIC + FUNC_ENTER_STATIC - H5FD_IOC_LOG_CALL(FUNC); + H5FD_IOC_LOG_CALL(FUNC); - HDassert(old_fa_ptr); + HDassert(old_fa_ptr); - new_fa_ptr = H5FL_CALLOC(H5FD_ioc_config_t); - if (NULL == new_fa_ptr) - HGOTO_ERROR(H5E_VFL, H5E_CANTALLOC, NULL, - "unable to allocate log file FAPL") + new_fa_ptr = H5FL_CALLOC(H5FD_ioc_config_t); + if (NULL == new_fa_ptr) + HGOTO_ERROR(H5E_VFL, H5E_CANTALLOC, NULL, "unable to allocate log file FAPL") - HDmemcpy(new_fa_ptr, old_fa_ptr, sizeof(H5FD_ioc_config_t)); - HDstrncpy(new_fa_ptr->common.file_path, old_fa_ptr->common.file_path, - H5FD_IOC_PATH_MAX); + HDmemcpy(new_fa_ptr, old_fa_ptr, sizeof(H5FD_ioc_config_t)); + HDstrncpy(new_fa_ptr->common.file_path, old_fa_ptr->common.file_path, H5FD_IOC_PATH_MAX); - /* Copy the FAPL */ - if (H5FD__copy_plist(old_fa_ptr->common.ioc_fapl_id, - &(new_fa_ptr->common.ioc_fapl_id)) < 0) - HGOTO_ERROR(H5E_VFL, H5E_BADVALUE, NULL, "can't copy the IOC FAPL"); + /* Copy the FAPL */ + if (H5FD__copy_plist(old_fa_ptr->common.ioc_fapl_id, &(new_fa_ptr->common.ioc_fapl_id)) < 0) + HGOTO_ERROR(H5E_VFL, H5E_BADVALUE, NULL, "can't copy the IOC FAPL"); - ret_value = (void *)new_fa_ptr; + ret_value = (void *)new_fa_ptr; done: - if (NULL == ret_value) - if (new_fa_ptr) - new_fa_ptr = H5FL_FREE(H5FD_ioc_config_t, new_fa_ptr); + if (NULL == ret_value) + if (new_fa_ptr) + new_fa_ptr = H5FL_FREE(H5FD_ioc_config_t, new_fa_ptr); - FUNC_LEAVE_NOAPI(ret_value) + FUNC_LEAVE_NOAPI(ret_value) } /* end H5FD__ioc_fapl_copy() */ /*-------------------------------------------------------------------------- @@ -749,25 +699,27 @@ done: * Return: SUCCEED/FAIL *-------------------------------------------------------------------------- */ -static herr_t H5FD__ioc_fapl_free(void *_fapl) { - H5FD_ioc_config_t *fapl = (H5FD_ioc_config_t *)_fapl; - herr_t ret_value = SUCCEED; +static herr_t +H5FD__ioc_fapl_free(void *_fapl) +{ + H5FD_ioc_config_t *fapl = (H5FD_ioc_config_t *)_fapl; + herr_t ret_value = SUCCEED; - FUNC_ENTER_STATIC + FUNC_ENTER_STATIC - H5FD_IOC_LOG_CALL(FUNC); + H5FD_IOC_LOG_CALL(FUNC); - /* Check arguments */ - HDassert(fapl); + /* Check arguments */ + HDassert(fapl); - if (H5I_dec_ref(fapl->common.ioc_fapl_id) < 0) - HGOTO_ERROR(H5E_VFL, H5E_CANTDEC, FAIL, "can't close W/O FAPL ID") + if (H5I_dec_ref(fapl->common.ioc_fapl_id) < 0) + HGOTO_ERROR(H5E_VFL, H5E_CANTDEC, FAIL, "can't close W/O FAPL ID") - /* Free the property list */ - fapl = H5FL_FREE(H5FD_ioc_config_t, fapl); + /* Free the property list */ + fapl = H5FL_FREE(H5FD_ioc_config_t, fapl); done: - FUNC_LEAVE_NOAPI(ret_value) + FUNC_LEAVE_NOAPI(ret_value) } /* end H5FD__ioc_fapl_free() */ /*------------------------------------------------------------------------- @@ -781,199 +733,201 @@ done: * Failure: NULL *------------------------------------------------------------------------- */ -static H5FD_t *H5FD__ioc_open(const char *name, unsigned flags, - hid_t ioc_fapl_id, haddr_t maxaddr) { - H5FD_ioc_t *file_ptr = NULL; /* Ioc VFD info */ - const H5FD_ioc_config_t *fapl_ptr = NULL; /* Driver-specific property list */ - H5FD_class_t *driver = NULL; /* VFD for file */ - H5FD_driver_prop_t driver_prop; /* Property for driver ID & info */ - H5P_genplist_t *plist_ptr = NULL; - H5FD_t *ret_value = NULL; - int mpi_enabled = 0; - FUNC_ENTER_STATIC - - H5FD_IOC_LOG_CALL(FUNC); - - /* Check arguments */ - if (!name || !*name) - HGOTO_ERROR(H5E_ARGS, H5E_BADVALUE, NULL, "invalid file name") - if (0 == maxaddr || HADDR_UNDEF == maxaddr) - HGOTO_ERROR(H5E_ARGS, H5E_BADRANGE, NULL, "bogus maxaddr") - if (ADDR_OVERFLOW(maxaddr)) - HGOTO_ERROR(H5E_ARGS, H5E_OVERFLOW, NULL, "bogus maxaddr") - if ((H5P_FILE_ACCESS_DEFAULT == ioc_fapl_id) || - (H5FD_IOC != H5Pget_driver(ioc_fapl_id))) - /* presupposes that H5P_FILE_ACCESS_DEFAULT is not a ioc */ - HGOTO_ERROR(H5E_ARGS, H5E_BADVALUE, NULL, "driver is not ioc") - - /* We should validate that the application has been initialized - * with MPI_Init_thread and that the library supports - * MPI_THREAD_MULTIPLE - */ - if (MPI_Initialized(&mpi_enabled) == MPI_SUCCESS) { - int mpi_provides = 0; - MPI_Query_thread(&mpi_provides); - if (mpi_provides != MPI_THREAD_MULTIPLE) { - HGOTO_ERROR(H5E_VFL, H5E_BADVALUE, NULL, "Subfiling requires the use of MPI_THREAD_MULTIPLE") +static H5FD_t * +H5FD__ioc_open(const char *name, unsigned flags, hid_t ioc_fapl_id, haddr_t maxaddr) +{ + H5FD_ioc_t * file_ptr = NULL; /* Ioc VFD info */ + const H5FD_ioc_config_t *fapl_ptr = NULL; /* Driver-specific property list */ + H5FD_class_t * driver = NULL; /* VFD for file */ + H5FD_driver_prop_t driver_prop; /* Property for driver ID & info */ + H5P_genplist_t * plist_ptr = NULL; + H5FD_t * ret_value = NULL; + int mpi_enabled = 0; + FUNC_ENTER_STATIC + + H5FD_IOC_LOG_CALL(FUNC); + + /* Check arguments */ + if (!name || !*name) + HGOTO_ERROR(H5E_ARGS, H5E_BADVALUE, NULL, "invalid file name") + if (0 == maxaddr || HADDR_UNDEF == maxaddr) + HGOTO_ERROR(H5E_ARGS, H5E_BADRANGE, NULL, "bogus maxaddr") + if (ADDR_OVERFLOW(maxaddr)) + HGOTO_ERROR(H5E_ARGS, H5E_OVERFLOW, NULL, "bogus maxaddr") + if ((H5P_FILE_ACCESS_DEFAULT == ioc_fapl_id) || (H5FD_IOC != H5Pget_driver(ioc_fapl_id))) + /* presupposes that H5P_FILE_ACCESS_DEFAULT is not a ioc */ + HGOTO_ERROR(H5E_ARGS, H5E_BADVALUE, NULL, "driver is not ioc") + + /* We should validate that the application has been initialized + * with MPI_Init_thread and that the library supports + * MPI_THREAD_MULTIPLE + */ + if (MPI_Initialized(&mpi_enabled) == MPI_SUCCESS) { + int mpi_provides = 0; + MPI_Query_thread(&mpi_provides); + if (mpi_provides != MPI_THREAD_MULTIPLE) { + HGOTO_ERROR(H5E_VFL, H5E_BADVALUE, NULL, "Subfiling requires the use of MPI_THREAD_MULTIPLE") + } } - } - file_ptr = (H5FD_ioc_t *)H5FL_CALLOC(H5FD_ioc_t); - if (NULL == file_ptr) - HGOTO_ERROR(H5E_VFL, H5E_CANTALLOC, NULL, "unable to allocate file struct") + file_ptr = (H5FD_ioc_t *)H5FL_CALLOC(H5FD_ioc_t); + if (NULL == file_ptr) + HGOTO_ERROR(H5E_VFL, H5E_CANTALLOC, NULL, "unable to allocate file struct") /* Get some basic MPI information */ - MPI_Comm_size(MPI_COMM_WORLD, &file_ptr->mpi_size); - MPI_Comm_rank(MPI_COMM_WORLD, &file_ptr->mpi_rank); - - /* Get the driver-specific file access properties */ - plist_ptr = (H5P_genplist_t *)H5I_object(ioc_fapl_id); - if (NULL == plist_ptr) - HGOTO_ERROR(H5E_ARGS, H5E_BADTYPE, NULL, "not a file access property list") - - fapl_ptr = (const H5FD_ioc_config_t *)H5P_peek_driver_info(plist_ptr); - if (NULL == fapl_ptr) - HGOTO_ERROR(H5E_PLIST, H5E_CANTGET, NULL, "unable to get VFL driver info") - - /* Fill in the file config values */ - memcpy(&file_ptr->fa, fapl_ptr, sizeof(H5FD_ioc_config_t)); - - /* Extend the config info with file_path and file_dir */ - if (HDrealpath(name, file_ptr->fa.common.file_path) != NULL) { - char *path = HDstrdup(file_ptr->fa.common.file_path); - char *directory = dirname(path); - HDstrcpy(file_ptr->fa.common.file_dir, directory); - HDfree(path); - } - - /* Copy the ioc FAPL. */ - if (H5FD__copy_plist(fapl_ptr->common.ioc_fapl_id, - &(file_ptr->fa.common.ioc_fapl_id)) < 0) - HGOTO_ERROR(H5E_VFL, H5E_BADVALUE, NULL, "can't copy W/O FAPL"); - - /* Check the "native" driver (sec2 or mpio) */ - plist_ptr = (H5P_genplist_t *)H5I_object(fapl_ptr->common.ioc_fapl_id); - - if (H5P_peek(plist_ptr, H5F_ACS_FILE_DRV_NAME, &driver_prop) < 0) - HGOTO_ERROR(H5E_PLIST, H5E_CANTGET, NULL, "can't get driver ID & info") - if (NULL == (driver = (H5FD_class_t *)H5I_object(driver_prop.driver_id))) - HGOTO_ERROR(H5E_VFL, H5E_BADVALUE, NULL, - "invalid driver ID in file access property list") - - if (strncmp(driver->name, "sec2", 4) == 0) { - uint64_t inode_id = (uint64_t)-1; - int ioc_flags = O_RDWR; - - /* Translate the HDF5 file open flags into standard POSIX open flags */ - if (flags & H5F_ACC_TRUNC) - ioc_flags |= O_TRUNC; - if (flags & H5F_ACC_CREAT) - ioc_flags |= O_CREAT; - - /* sec2 open the file */ - file_ptr->ioc_file = H5FD_open(file_ptr->fa.common.file_path, flags, - fapl_ptr->common.ioc_fapl_id, HADDR_UNDEF); - if (file_ptr->ioc_file) { - h5_stat_t sb; - H5FD_sec2_t *hdf_file = (H5FD_sec2_t *)file_ptr->ioc_file; - if (HDfstat(hdf_file->fd, &sb) < 0) - HSYS_GOTO_ERROR(H5E_FILE, H5E_BADFILE, NULL, "unable to fstat file") - /* Get the inode info an copy the open file descriptor - * The latter is used to pass to the subfiling code to use - * as an alternative to opening a new subfiling file, e.g. nnn_0_of_N.h5 - * - * We will use the user named HDF5 file as the zeroth subfile. - * Because of this we need prevent the new file opening as the zeroth - * subfile. For this reason, we will pass along a copy of the sec2 opened - * file descriptor. - */ - file_ptr->inode = inode_id = sb.st_ino; - file_ptr->fd = HDdup(hdf_file->fd); - // HDclose(hdf_file->fd); + MPI_Comm_size(MPI_COMM_WORLD, &file_ptr->mpi_size); + MPI_Comm_rank(MPI_COMM_WORLD, &file_ptr->mpi_rank); + + /* Get the driver-specific file access properties */ + plist_ptr = (H5P_genplist_t *)H5I_object(ioc_fapl_id); + if (NULL == plist_ptr) + HGOTO_ERROR(H5E_ARGS, H5E_BADTYPE, NULL, "not a file access property list") + + fapl_ptr = (const H5FD_ioc_config_t *)H5P_peek_driver_info(plist_ptr); + if (NULL == fapl_ptr) + HGOTO_ERROR(H5E_PLIST, H5E_CANTGET, NULL, "unable to get VFL driver info") + + /* Fill in the file config values */ + memcpy(&file_ptr->fa, fapl_ptr, sizeof(H5FD_ioc_config_t)); + + /* Extend the config info with file_path and file_dir */ + if (HDrealpath(name, file_ptr->fa.common.file_path) != NULL) { + char *path = HDstrdup(file_ptr->fa.common.file_path); + char *directory = dirname(path); + HDstrcpy(file_ptr->fa.common.file_dir, directory); + HDfree(path); } - else goto done; + /* Copy the ioc FAPL. */ + if (H5FD__copy_plist(fapl_ptr->common.ioc_fapl_id, &(file_ptr->fa.common.ioc_fapl_id)) < 0) + HGOTO_ERROR(H5E_VFL, H5E_BADVALUE, NULL, "can't copy W/O FAPL"); + + /* Check the "native" driver (sec2 or mpio) */ + plist_ptr = (H5P_genplist_t *)H5I_object(fapl_ptr->common.ioc_fapl_id); + + if (H5P_peek(plist_ptr, H5F_ACS_FILE_DRV_NAME, &driver_prop) < 0) + HGOTO_ERROR(H5E_PLIST, H5E_CANTGET, NULL, "can't get driver ID & info") + if (NULL == (driver = (H5FD_class_t *)H5I_object(driver_prop.driver_id))) + HGOTO_ERROR(H5E_VFL, H5E_BADVALUE, NULL, "invalid driver ID in file access property list") + + if (strncmp(driver->name, "sec2", 4) == 0) { + uint64_t inode_id = (uint64_t)-1; + int ioc_flags = O_RDWR; + + /* Translate the HDF5 file open flags into standard POSIX open flags */ + if (flags & H5F_ACC_TRUNC) + ioc_flags |= O_TRUNC; + if (flags & H5F_ACC_CREAT) + ioc_flags |= O_CREAT; + + /* sec2 open the file */ + file_ptr->ioc_file = + H5FD_open(file_ptr->fa.common.file_path, flags, fapl_ptr->common.ioc_fapl_id, HADDR_UNDEF); + if (file_ptr->ioc_file) { + h5_stat_t sb; + H5FD_sec2_t *hdf_file = (H5FD_sec2_t *)file_ptr->ioc_file; + if (HDfstat(hdf_file->fd, &sb) < 0) + HSYS_GOTO_ERROR(H5E_FILE, H5E_BADFILE, NULL, "unable to fstat file") + /* Get the inode info and copy the open file descriptor + * The latter is used to pass to the subfiling code to use + * as an alternative to opening a new subfiling file, e.g. nnn_0_of_N.h5 + */ + file_ptr->inode = inode_id = sb.st_ino; + // file_ptr->hdf_fd_dup = HDdup(hdf_file->fd); + } + else + goto done; - /* See: H5FDsubfile_int.c */ - if (H5FD__open_subfiles((void *)&file_ptr->fa, inode_id, ioc_flags) < 0) - HGOTO_ERROR(H5E_FILE, H5E_CANTOPENFILE, NULL, - "unable to open subfiling files = %s\n", name) + /* See: H5FDsubfile_int.c: returns error count! */ + if (H5FD__open_subfiles((void *)&file_ptr->fa, inode_id, ioc_flags) > 0) + HGOTO_ERROR(H5E_FILE, H5E_CANTOPENFILE, NULL, "unable to open subfiling files = %s\n", name) - else if (file_ptr->fd > 0) { - subfiling_context_t *sf_context = - get__subfiling_object(file_ptr->fa.common.context_id); + else if (file_ptr->inode > 0) { /* No errors opening the subfiles */ + subfiling_context_t *sf_context = get__subfiling_object(file_ptr->fa.common.context_id); #if 0 /* JRM */ /* original */ if (sf_context) { #else /* JRM */ /* Richard's fix */ - if (sf_context && sf_context->topology->rank_is_ioc) { + if (sf_context && sf_context->topology->rank_is_ioc) { #endif - if (initialize_ioc_threads(sf_context) < 0) { - HGOTO_ERROR(H5E_ARGS, H5E_BADVALUE, NULL, - "Unable to initialize IOC threads") + if (initialize_ioc_threads(sf_context) < 0) { + HGOTO_ERROR(H5E_ARGS, H5E_BADVALUE, NULL, "Unable to initialize IOC threads") + } } - } } - } - - else { +} +else +{ HDputs("We only support sec2 file opens at the moment."); - HGOTO_ERROR(H5E_FILE, H5E_CANTOPENFILE, NULL, "unable to open file = %s\n", - name) - } + HGOTO_ERROR(H5E_FILE, H5E_CANTOPENFILE, NULL, "unable to open file = %s\n", name) +} - ret_value = (H5FD_t *)file_ptr; +ret_value = (H5FD_t *)file_ptr; -done: - if (NULL == ret_value) { +done : if (NULL == ret_value) +{ if (file_ptr) { - if (file_ptr->ioc_file) - H5FD_close(file_ptr->ioc_file); - H5FL_FREE(H5FD_ioc_t, file_ptr); + if (file_ptr->ioc_file) + H5FD_close(file_ptr->ioc_file); + H5FL_FREE(H5FD_ioc_t, file_ptr); } - } /* end if error */ +} /* end if error */ - FUNC_LEAVE_NOAPI(ret_value) +FUNC_LEAVE_NOAPI(ret_value) } /* end H5FD__ioc_open() */ /*------------------------------------------------------------------------- * Function: H5FD__ioc_close * - * Purpose: Closes files on both read-write and write-only channels. + * Purpose: Closes files * * Return: Success: SUCCEED * Failure: FAIL, file not closed. *------------------------------------------------------------------------- */ -static herr_t H5FD__ioc_close(H5FD_t *_file) { - H5FD_ioc_t *file = (H5FD_ioc_t *)_file; - herr_t ret_value = SUCCEED; +static herr_t +H5FD__ioc_close(H5FD_t *_file) +{ + H5FD_ioc_t *file = (H5FD_ioc_t *)_file; + herr_t ret_value = SUCCEED; + // subfiling_context_t *sf_context = NULL; - FUNC_ENTER_STATIC + FUNC_ENTER_STATIC - H5FD_IOC_LOG_CALL(FUNC); + H5FD_IOC_LOG_CALL(FUNC); - /* Sanity check */ - HDassert(file); + /* Sanity check */ + HDassert(file); +#ifdef VERBOSE + sf_context = (subfiling_context_t *)get__subfiling_object(file->fa.common.context_id); + if (sf_context->topology->rank_is_ioc) + printf("[%s %d] fd=%d\n", __func__, file->mpi_rank, sf_context->sf_fid); + else + printf("[%s %d] fd=*\n", __func__, file->mpi_rank); + fflush(stdout); +#endif - if (H5I_dec_ref(file->fa.common.ioc_fapl_id) < 0) - HGOTO_ERROR(H5E_VFL, H5E_ARGS, FAIL, "can't close W/O FAPL") + if (H5I_dec_ref(file->fa.common.ioc_fapl_id) < 0) + HGOTO_ERROR(H5E_VFL, H5E_ARGS, FAIL, "can't close W/O FAPL") - /* Call the sec2 close */ - if (file->ioc_file) - if (H5FD_close(file->ioc_file) == FAIL) - HGOTO_ERROR(H5E_VFL, H5E_CANTCLOSEFILE, FAIL, "unable to close HDF5 file") + /* Call the sec2 close */ + if (file->ioc_file) { + if (H5FD_close(file->ioc_file) == FAIL) + HGOTO_ERROR(H5E_VFL, H5E_CANTCLOSEFILE, FAIL, "unable to close HDF5 file") + } - /* See: H5FDsubfile_int.c */ - if (H5FD__close_subfiles(file->fa.common.context_id) < 0) - HGOTO_ERROR(H5E_VFL, H5E_CANTCLOSEFILE, FAIL, - "unable to close subfiling file(s)") + /* See: H5FDsubfile_int.c */ + if (H5FD__close_subfiles(file->fa.common.context_id) < 0) + HGOTO_ERROR(H5E_VFL, H5E_CANTCLOSEFILE, FAIL, "unable to close subfiling file(s)") - /* Release the file info */ - file = H5FL_FREE(H5FD_ioc_t, file); - file = NULL; + /* dup'ed in the H5FD__ioc_open function (see above) */ + HDclose(file->hdf_fd_dup); + /* Release the file info */ + file = H5FL_FREE(H5FD_ioc_t, file); + file = NULL; done: - FUNC_LEAVE_NOAPI(ret_value) + + FUNC_LEAVE_NOAPI(ret_value) } /* end H5FD__ioc_close() */ /*------------------------------------------------------------------------- @@ -988,24 +942,25 @@ done: * Failure: HADDR_UNDEF *------------------------------------------------------------------------- */ -static haddr_t H5FD__ioc_get_eoa(const H5FD_t *_file, - H5FD_mem_t H5_ATTR_UNUSED type) { - const H5FD_ioc_t *file = (const H5FD_ioc_t *)_file; - haddr_t ret_value = HADDR_UNDEF; +static haddr_t +H5FD__ioc_get_eoa(const H5FD_t *_file, H5FD_mem_t H5_ATTR_UNUSED type) +{ + const H5FD_ioc_t *file = (const H5FD_ioc_t *)_file; + haddr_t ret_value = HADDR_UNDEF; - FUNC_ENTER_STATIC + FUNC_ENTER_STATIC - H5FD_IOC_LOG_CALL(FUNC); + H5FD_IOC_LOG_CALL(FUNC); - /* Sanity check */ - HDassert(file); - HDassert(file->ioc_file); + /* Sanity check */ + HDassert(file); + HDassert(file->ioc_file); - if ((ret_value = H5FD_get_eoa(file->ioc_file, type)) == HADDR_UNDEF) - HGOTO_ERROR(H5E_VFL, H5E_BADVALUE, HADDR_UNDEF, "unable to get eoa") + if ((ret_value = H5FD_get_eoa(file->ioc_file, type)) == HADDR_UNDEF) + HGOTO_ERROR(H5E_VFL, H5E_BADVALUE, HADDR_UNDEF, "unable to get eoa") done: - FUNC_LEAVE_NOAPI(ret_value) + FUNC_LEAVE_NOAPI(ret_value) } /* end H5FD__ioc_get_eoa */ /*------------------------------------------------------------------------- @@ -1018,25 +973,26 @@ done: * Return: SUCCEED/FAIL *------------------------------------------------------------------------- */ -static herr_t H5FD__ioc_set_eoa(H5FD_t *_file, H5FD_mem_t H5_ATTR_UNUSED type, - haddr_t addr) { - H5FD_ioc_t *file = (H5FD_ioc_t *)_file; - herr_t ret_value = SUCCEED; /* Return value */ +static herr_t +H5FD__ioc_set_eoa(H5FD_t *_file, H5FD_mem_t H5_ATTR_UNUSED type, haddr_t addr) +{ + H5FD_ioc_t *file = (H5FD_ioc_t *)_file; + herr_t ret_value = SUCCEED; /* Return value */ - FUNC_ENTER_STATIC + FUNC_ENTER_STATIC - H5FD_IOC_LOG_CALL(FUNC) + H5FD_IOC_LOG_CALL(FUNC) - /* Sanity check */ - HDassert(file); - HDassert(file->ioc_file); - HDassert(file->ioc_file); + /* Sanity check */ + HDassert(file); + HDassert(file->ioc_file); + HDassert(file->ioc_file); - if (H5FD_set_eoa(file->ioc_file, type, addr) < 0) - HGOTO_ERROR(H5E_VFL, H5E_CANTSET, FAIL, "H5FDset_eoa failed for R/W file") + if (H5FD_set_eoa(file->ioc_file, type, addr) < 0) + HGOTO_ERROR(H5E_VFL, H5E_CANTSET, FAIL, "H5FDset_eoa failed for R/W file") done: - FUNC_LEAVE_NOAPI(ret_value) + FUNC_LEAVE_NOAPI(ret_value) } /* end H5FD__ioc_set_eoa() */ /*------------------------------------------------------------------------- @@ -1051,24 +1007,32 @@ done: * Failure: HADDR_UNDEF *------------------------------------------------------------------------- */ -static haddr_t H5FD__ioc_get_eof(const H5FD_t *_file, - H5FD_mem_t H5_ATTR_UNUSED type) { - const H5FD_ioc_t *file = (const H5FD_ioc_t *)_file; - haddr_t ret_value = HADDR_UNDEF; /* Return value */ +static haddr_t +H5FD__ioc_get_eof(const H5FD_t *_file, H5FD_mem_t H5_ATTR_UNUSED type) +{ + const H5FD_ioc_t * file = (const H5FD_ioc_t *)_file; + haddr_t ret_value = HADDR_UNDEF; /* Return value */ + subfiling_context_t *sf_context = NULL; + + FUNC_ENTER_STATIC - FUNC_ENTER_STATIC + H5FD_IOC_LOG_CALL(FUNC); - H5FD_IOC_LOG_CALL(FUNC); + /* Sanity check */ + HDassert(file); + HDassert(file->ioc_file); - /* Sanity check */ - HDassert(file); - HDassert(file->ioc_file); + sf_context = get__subfiling_object(file->fa.common.context_id); + if (sf_context) { + ret_value = (haddr_t)sf_context->sf_eof; + goto done; + } - if (HADDR_UNDEF == (ret_value = H5FD_get_eof(file->ioc_file, type))) - HGOTO_ERROR(H5E_VFL, H5E_CANTGET, HADDR_UNDEF, "unable to get eof") + if (HADDR_UNDEF == (ret_value = H5FD_get_eof(file->ioc_file, type))) + HGOTO_ERROR(H5E_VFL, H5E_CANTGET, HADDR_UNDEF, "unable to get eof") done: - FUNC_LEAVE_NOAPI(ret_value) + FUNC_LEAVE_NOAPI(ret_value) } /* end H5FD__ioc_get_eof */ /*------------------------------------------------------------------------- @@ -1079,23 +1043,24 @@ done: * Return: SUCCEED/FAIL *------------------------------------------------------------------------- */ -static herr_t H5FD__ioc_truncate(H5FD_t *_file, hid_t dxpl_id, - hbool_t closing) { - H5FD_ioc_t *file = (H5FD_ioc_t *)_file; - herr_t ret_value = SUCCEED; /* Return value */ +static herr_t +H5FD__ioc_truncate(H5FD_t *_file, hid_t dxpl_id, hbool_t closing) +{ + H5FD_ioc_t *file = (H5FD_ioc_t *)_file; + herr_t ret_value = SUCCEED; /* Return value */ - FUNC_ENTER_STATIC + FUNC_ENTER_STATIC - H5FD_IOC_LOG_CALL(FUNC); + H5FD_IOC_LOG_CALL(FUNC); - HDassert(file); - HDassert(file->ioc_file); - HDassert(file->ioc_file); + HDassert(file); + HDassert(file->ioc_file); + HDassert(file->ioc_file); - if (H5FDtruncate(file->ioc_file, dxpl_id, closing) < 0) - HGOTO_ERROR(H5E_VFL, H5E_CANTUPDATE, FAIL, "unable to truncate R/W file") + if (H5FDtruncate(file->ioc_file, dxpl_id, closing) < 0) + HGOTO_ERROR(H5E_VFL, H5E_CANTUPDATE, FAIL, "unable to truncate R/W file") done: - FUNC_LEAVE_NOAPI(ret_value) + FUNC_LEAVE_NOAPI(ret_value) } /* end H5FD__ioc_truncate */ /*------------------------------------------------------------------------- @@ -1112,22 +1077,24 @@ done: * NOTE: no public API for H5FD_sb_size, it needs to be added *------------------------------------------------------------------------- */ -static hsize_t H5FD__ioc_sb_size(H5FD_t *_file) { - H5FD_ioc_t *file = (H5FD_ioc_t *)_file; - hsize_t ret_value = 0; +static hsize_t +H5FD__ioc_sb_size(H5FD_t *_file) +{ + H5FD_ioc_t *file = (H5FD_ioc_t *)_file; + hsize_t ret_value = 0; - FUNC_ENTER_STATIC_NOERR + FUNC_ENTER_STATIC_NOERR - H5FD_IOC_LOG_CALL(FUNC); + H5FD_IOC_LOG_CALL(FUNC); - /* Sanity check */ - HDassert(file); - HDassert(file->ioc_file); + /* Sanity check */ + HDassert(file); + HDassert(file->ioc_file); - if (file->ioc_file) - ret_value = H5FD_sb_size(file->ioc_file); + if (file->ioc_file) + ret_value = H5FD_sb_size(file->ioc_file); - FUNC_LEAVE_NOAPI(ret_value) + FUNC_LEAVE_NOAPI(ret_value) } /* end H5FD__ioc_sb_size */ /*------------------------------------------------------------------------- @@ -1138,25 +1105,25 @@ static hsize_t H5FD__ioc_sb_size(H5FD_t *_file) { * Return: SUCCEED/FAIL *------------------------------------------------------------------------- */ -static herr_t H5FD__ioc_sb_encode(H5FD_t *_file, char *name /*out*/, - unsigned char *buf /*out*/) { - H5FD_ioc_t *file = (H5FD_ioc_t *)_file; - herr_t ret_value = SUCCEED; /* Return value */ +static herr_t +H5FD__ioc_sb_encode(H5FD_t *_file, char *name /*out*/, unsigned char *buf /*out*/) +{ + H5FD_ioc_t *file = (H5FD_ioc_t *)_file; + herr_t ret_value = SUCCEED; /* Return value */ - FUNC_ENTER_STATIC + FUNC_ENTER_STATIC - H5FD_IOC_LOG_CALL(FUNC); + H5FD_IOC_LOG_CALL(FUNC); - /* Sanity check */ - HDassert(file); - HDassert(file->ioc_file); + /* Sanity check */ + HDassert(file); + HDassert(file->ioc_file); - if (file->ioc_file && H5FD_sb_encode(file->ioc_file, name, buf) < 0) - HGOTO_ERROR(H5E_VFL, H5E_CANTENCODE, FAIL, - "unable to encode the superblock in R/W file") + if (file->ioc_file && H5FD_sb_encode(file->ioc_file, name, buf) < 0) + HGOTO_ERROR(H5E_VFL, H5E_CANTENCODE, FAIL, "unable to encode the superblock in R/W file") done: - FUNC_LEAVE_NOAPI(ret_value) + FUNC_LEAVE_NOAPI(ret_value) } /* end H5FD__ioc_sb_encode */ /*------------------------------------------------------------------------- @@ -1169,25 +1136,25 @@ done: * NOTE: no public API for H5FD_sb_size, need to add *------------------------------------------------------------------------- */ -static herr_t H5FD__ioc_sb_decode(H5FD_t *_file, const char *name, - const unsigned char *buf) { - H5FD_ioc_t *file = (H5FD_ioc_t *)_file; - herr_t ret_value = SUCCEED; /* Return value */ +static herr_t +H5FD__ioc_sb_decode(H5FD_t *_file, const char *name, const unsigned char *buf) +{ + H5FD_ioc_t *file = (H5FD_ioc_t *)_file; + herr_t ret_value = SUCCEED; /* Return value */ - FUNC_ENTER_STATIC + FUNC_ENTER_STATIC - H5FD_IOC_LOG_CALL(FUNC); + H5FD_IOC_LOG_CALL(FUNC); - /* Sanity check */ - HDassert(file); - HDassert(file->ioc_file); + /* Sanity check */ + HDassert(file); + HDassert(file->ioc_file); - if (H5FD_sb_load(file->ioc_file, name, buf) < 0) - HGOTO_ERROR(H5E_VFL, H5E_CANTDECODE, FAIL, - "unable to decode the superblock in R/W file") + if (H5FD_sb_load(file->ioc_file, name, buf) < 0) + HGOTO_ERROR(H5E_VFL, H5E_CANTDECODE, FAIL, "unable to decode the superblock in R/W file") done: - FUNC_LEAVE_NOAPI(ret_value) + FUNC_LEAVE_NOAPI(ret_value) } /* end H5FD__ioc_sb_decode */ /*------------------------------------------------------------------------- @@ -1199,21 +1166,23 @@ done: * Failure: Must never fail *------------------------------------------------------------------------- */ -static int H5FD__ioc_cmp(const H5FD_t *_f1, const H5FD_t *_f2) { - const H5FD_ioc_t *f1 = (const H5FD_ioc_t *)_f1; - const H5FD_ioc_t *f2 = (const H5FD_ioc_t *)_f2; - herr_t ret_value = 0; /* Return value */ +static int +H5FD__ioc_cmp(const H5FD_t *_f1, const H5FD_t *_f2) +{ + const H5FD_ioc_t *f1 = (const H5FD_ioc_t *)_f1; + const H5FD_ioc_t *f2 = (const H5FD_ioc_t *)_f2; + herr_t ret_value = 0; /* Return value */ - FUNC_ENTER_STATIC_NOERR + FUNC_ENTER_STATIC_NOERR - H5FD_IOC_LOG_CALL(FUNC); + H5FD_IOC_LOG_CALL(FUNC); - HDassert(f1); - HDassert(f2); + HDassert(f1); + HDassert(f2); - ret_value = H5FD_cmp(f1->ioc_file, f2->ioc_file); + ret_value = H5FD_cmp(f1->ioc_file, f2->ioc_file); - FUNC_LEAVE_NOAPI(ret_value) + FUNC_LEAVE_NOAPI(ret_value) } /* end H5FD__ioc_cmp */ /*-------------------------------------------------------------------------- @@ -1225,26 +1194,26 @@ static int H5FD__ioc_cmp(const H5FD_t *_f1, const H5FD_t *_f2) { * Return: SUCCEED/FAIL *-------------------------------------------------------------------------- */ -static herr_t H5FD__ioc_get_handle(H5FD_t *_file, hid_t H5_ATTR_UNUSED fapl, - void **file_handle) { - H5FD_ioc_t *file = (H5FD_ioc_t *)_file; - herr_t ret_value = SUCCEED; /* Return value */ +static herr_t +H5FD__ioc_get_handle(H5FD_t *_file, hid_t H5_ATTR_UNUSED fapl, void **file_handle) +{ + H5FD_ioc_t *file = (H5FD_ioc_t *)_file; + herr_t ret_value = SUCCEED; /* Return value */ - FUNC_ENTER_STATIC + FUNC_ENTER_STATIC - H5FD_IOC_LOG_CALL(FUNC); + H5FD_IOC_LOG_CALL(FUNC); - /* Check arguments */ - HDassert(file); - HDassert(file->ioc_file); - HDassert(file_handle); + /* Check arguments */ + HDassert(file); + HDassert(file->ioc_file); + HDassert(file_handle); - if (H5FD_get_vfd_handle(file->ioc_file, file->fa.common.ioc_fapl_id, - file_handle) < 0) - HGOTO_ERROR(H5E_VFL, H5E_CANTGET, FAIL, "unable to get handle of R/W file") + if (H5FD_get_vfd_handle(file->ioc_file, file->fa.common.ioc_fapl_id, file_handle) < 0) + HGOTO_ERROR(H5E_VFL, H5E_CANTGET, FAIL, "unable to get handle of R/W file") done: - FUNC_LEAVE_NOAPI(ret_value) + FUNC_LEAVE_NOAPI(ret_value) } /* end H5FD__ioc_get_handle */ /*-------------------------------------------------------------------------- @@ -1255,23 +1224,32 @@ done: * Return: SUCCEED/FAIL *-------------------------------------------------------------------------- */ -static herr_t H5FD__ioc_lock(H5FD_t *_file, hbool_t rw) { - H5FD_ioc_t *file = (H5FD_ioc_t *)_file; /* VFD file struct */ - herr_t ret_value = SUCCEED; /* Return value */ +static herr_t +H5FD__ioc_lock(H5FD_t *_file, hbool_t H5_ATTR_UNUSED rw) +{ + H5FD_ioc_t *file = (H5FD_ioc_t *)_file; /* VFD file struct */ + herr_t ret_value = SUCCEED; /* Return value */ - FUNC_ENTER_STATIC + FUNC_ENTER_STATIC - H5FD_IOC_LOG_CALL(FUNC); + H5FD_IOC_LOG_CALL(FUNC); - HDassert(file); - HDassert(file->ioc_file); + HDassert(file); + HDassert(file->ioc_file); - /* Place the lock on each file */ - if (H5FD_lock(file->ioc_file, rw) < 0) - HGOTO_ERROR(H5E_VFL, H5E_CANTLOCKFILE, FAIL, "unable to lock R/W file") +#if 1 + if (HDflock(file->hdf_fd_dup, LOCK_SH) < 0) { + perror("flock"); + HGOTO_ERROR(H5E_VFL, H5E_CANTLOCKFILE, FAIL, "unable to lock R/W file") + } +#else + /* Place the lock on each file */ + if (H5FD_lock(file->ioc_file, rw) < 0) + HGOTO_ERROR(H5E_VFL, H5E_CANTLOCKFILE, FAIL, "unable to lock R/W file") +#endif done: - FUNC_LEAVE_NOAPI(ret_value) + FUNC_LEAVE_NOAPI(ret_value) } /* end H5FD__ioc_lock */ /*-------------------------------------------------------------------------- @@ -1282,29 +1260,31 @@ done: * Return: SUCCEED/FAIL *-------------------------------------------------------------------------- */ -static herr_t H5FD__ioc_unlock(H5FD_t *_file) { - H5FD_ioc_t *file = (H5FD_ioc_t *)_file; /* VFD file struct */ - herr_t ret_value = SUCCEED; /* Return value */ - - FUNC_ENTER_STATIC - - H5FD_IOC_LOG_CALL(FUNC); +static herr_t +H5FD__ioc_unlock(H5FD_t *_file) +{ + H5FD_ioc_t *file = (H5FD_ioc_t *)_file; /* VFD file struct */ + herr_t ret_value = SUCCEED; /* Return value */ - /* Check arguments */ - HDassert(file); - HDassert(file->ioc_file); + FUNC_ENTER_STATIC - /* Remove the lock on each file */ - if (H5FD_unlock(file->ioc_file) < 0) - HGOTO_ERROR(H5E_VFL, H5E_CANTUNLOCKFILE, FAIL, "unable to unlock R/W file") + H5FD_IOC_LOG_CALL(FUNC); - if (file->ioc_file != NULL) - if (H5FD_unlock(file->ioc_file) < 0) - HGOTO_ERROR(H5E_VFL, H5E_CANTUNLOCKFILE, FAIL, - "unable to unlock W/O file") + /* Check arguments */ + HDassert(file); +#if 1 + if (HDflock(file->hdf_fd_dup, LOCK_UN) < 0) { + perror("flock"); + HGOTO_ERROR(H5E_VFL, H5E_CANTLOCKFILE, FAIL, "unable to lock R/W file") + } +#else + if (file->ioc_file != NULL) + if (H5FD_unlock(file->ioc_file) < 0) + HGOTO_ERROR(H5E_VFL, H5E_CANTUNLOCKFILE, FAIL, "unable to unlock W/O file") +#endif done: - FUNC_LEAVE_NOAPI(ret_value) + FUNC_LEAVE_NOAPI(ret_value) } /* end H5FD__ioc_unlock */ /*------------------------------------------------------------------------- @@ -1316,33 +1296,34 @@ done: * Return: SUCCEED/FAIL *------------------------------------------------------------------------- */ -static herr_t H5FD__ioc_query(const H5FD_t *_file, - unsigned long *flags /* out */) { - const H5FD_ioc_t *file_ptr = (const H5FD_ioc_t *)_file; - herr_t ret_value = SUCCEED; - - FUNC_ENTER_STATIC - - H5FD_IOC_LOG_CALL(FUNC); - - if (file_ptr == NULL) { - if (flags) - *flags = 0; - } - else if (file_ptr->ioc_file) { - if (H5FDquery(file_ptr->ioc_file, flags) < 0) - HGOTO_ERROR(H5E_VFL, H5E_CANTLOCK, FAIL, "unable to query R/W file"); - } - else { - /* There is no file. Because this is a pure passthrough VFD, - * it has no features of its own. - */ - if (flags) - *flags = 0; - } +static herr_t +H5FD__ioc_query(const H5FD_t *_file, unsigned long *flags /* out */) +{ + const H5FD_ioc_t *file_ptr = (const H5FD_ioc_t *)_file; + herr_t ret_value = SUCCEED; + + FUNC_ENTER_STATIC + + H5FD_IOC_LOG_CALL(FUNC); + + if (file_ptr == NULL) { + if (flags) + *flags = 0; + } + else if (file_ptr->ioc_file) { + if (H5FDquery(file_ptr->ioc_file, flags) < 0) + HGOTO_ERROR(H5E_VFL, H5E_CANTLOCK, FAIL, "unable to query R/W file"); + } + else { + /* There is no file. Because this is a pure passthrough VFD, + * it has no features of its own. + */ + if (flags) + *flags = 0; + } done: - FUNC_LEAVE_NOAPI(ret_value) + FUNC_LEAVE_NOAPI(ret_value) } /* end H5FD__ioc_query() */ /*------------------------------------------------------------------------- @@ -1353,28 +1334,27 @@ done: * Return: Address of allocated space (HADDR_UNDEF if error). *------------------------------------------------------------------------- */ -static haddr_t H5FD__ioc_alloc(H5FD_t *_file, H5FD_mem_t type, hid_t dxpl_id, - hsize_t size) { - H5FD_ioc_t *file = (H5FD_ioc_t *)_file; /* VFD file struct */ - haddr_t ret_value = HADDR_UNDEF; /* Return value */ +static haddr_t +H5FD__ioc_alloc(H5FD_t *_file, H5FD_mem_t type, hid_t dxpl_id, hsize_t size) +{ + H5FD_ioc_t *file = (H5FD_ioc_t *)_file; /* VFD file struct */ + haddr_t ret_value = HADDR_UNDEF; /* Return value */ - FUNC_ENTER_STATIC + FUNC_ENTER_STATIC - H5FD_IOC_LOG_CALL(FUNC); + H5FD_IOC_LOG_CALL(FUNC); - /* Check arguments */ - HDassert(file); - HDassert(file->ioc_file); + /* Check arguments */ + HDassert(file); + HDassert(file->ioc_file); - /* Allocate memory for each file, only return the return value for R/W file. - */ - if ((ret_value = H5FDalloc(file->ioc_file, type, dxpl_id, size)) == - HADDR_UNDEF) - HGOTO_ERROR(H5E_VFL, H5E_CANTINIT, HADDR_UNDEF, - "unable to allocate for R/W file") + /* Allocate memory for each file, only return the return value for R/W file. + */ + if ((ret_value = H5FDalloc(file->ioc_file, type, dxpl_id, size)) == HADDR_UNDEF) + HGOTO_ERROR(H5E_VFL, H5E_CANTINIT, HADDR_UNDEF, "unable to allocate for R/W file") done: - FUNC_LEAVE_NOAPI(ret_value) + FUNC_LEAVE_NOAPI(ret_value) } /* end H5FD__ioc_alloc() */ /*------------------------------------------------------------------------- @@ -1385,25 +1365,26 @@ done: * Return: SUCCEED/FAIL *------------------------------------------------------------------------- */ -static herr_t H5FD__ioc_get_type_map(const H5FD_t *_file, - H5FD_mem_t *type_map) { - const H5FD_ioc_t *file = (const H5FD_ioc_t *)_file; - herr_t ret_value = SUCCEED; +static herr_t +H5FD__ioc_get_type_map(const H5FD_t *_file, H5FD_mem_t *type_map) +{ + const H5FD_ioc_t *file = (const H5FD_ioc_t *)_file; + herr_t ret_value = SUCCEED; - FUNC_ENTER_STATIC + FUNC_ENTER_STATIC - H5FD_IOC_LOG_CALL(FUNC); + H5FD_IOC_LOG_CALL(FUNC); - /* Check arguments */ - HDassert(file); - HDassert(file->ioc_file); + /* Check arguments */ + HDassert(file); + HDassert(file->ioc_file); - /* Retrieve memory type mapping for R/W channel only */ - if (H5FD_get_fs_type_map(file->ioc_file, type_map) < 0) - HGOTO_ERROR(H5E_VFL, H5E_CANTGET, FAIL, "unable to allocate for R/W file") + /* Retrieve memory type mapping for R/W channel only */ + if (H5FD_get_fs_type_map(file->ioc_file, type_map) < 0) + HGOTO_ERROR(H5E_VFL, H5E_CANTGET, FAIL, "unable to allocate for R/W file") done: - FUNC_LEAVE_NOAPI(ret_value) + FUNC_LEAVE_NOAPI(ret_value) } /* end H5FD__ioc_get_type_map() */ /*------------------------------------------------------------------------- @@ -1414,27 +1395,35 @@ done: * Return: SUCCEED/FAIL *------------------------------------------------------------------------- */ -static herr_t H5FD__ioc_free(H5FD_t *_file, H5FD_mem_t type, hid_t dxpl_id, - haddr_t addr, hsize_t size) { - H5FD_ioc_t *file = (H5FD_ioc_t *)_file; /* VFD file struct */ - herr_t ret_value = SUCCEED; /* Return value */ +static herr_t +H5FD__ioc_free(H5FD_t *_file, H5FD_mem_t type, hid_t dxpl_id, haddr_t addr, hsize_t size) +{ + H5FD_ioc_t *file = (H5FD_ioc_t *)_file; /* VFD file struct */ + herr_t ret_value = SUCCEED; /* Return value */ - FUNC_ENTER_STATIC + FUNC_ENTER_STATIC - H5FD_IOC_LOG_CALL(FUNC); + H5FD_IOC_LOG_CALL(FUNC); - /* Check arguments */ - HDassert(file); - HDassert(file->ioc_file); + /* Check arguments */ + HDassert(file); + HDassert(file->ioc_file); - if (H5FDfree(file->ioc_file, type, dxpl_id, addr, size) < 0) - HGOTO_ERROR(H5E_VFL, H5E_CANTFREE, FAIL, "unable to free for R/W file") + if (H5FDfree(file->ioc_file, type, dxpl_id, addr, size) < 0) + HGOTO_ERROR(H5E_VFL, H5E_CANTFREE, FAIL, "unable to free for R/W file") done: - FUNC_LEAVE_NOAPI(ret_value) + FUNC_LEAVE_NOAPI(ret_value) } /* end H5FD__ioc_free() */ +void +H5FD_ioc_wait_thread_main(void) +{ + return; +} -void H5FD_ioc_wait_thread_main(void) { return; } - -void H5FD_ioc_finalize_threads(void) { return; } +void +H5FD_ioc_finalize_threads(void) +{ + return; +} diff --git a/src/H5FDioc.h b/src/H5FDioc.h index aeee9e1..119e004 100644 --- a/src/H5FDioc.h +++ b/src/H5FDioc.h @@ -23,14 +23,14 @@ #define H5FD_IOC (H5FD_ioc_init()) #ifndef H5FD_IOC_FAPL_T_MAGIC -#define H5FD_CURR_IOC_FAPL_T_VERSION 1 -#define H5FD_IOC_FAPL_T_MAGIC 0xFED21331 +#define H5FD_CURR_IOC_FAPL_T_VERSION 1 +#define H5FD_IOC_FAPL_T_MAGIC 0xFED21331 #endif /* Maximum length of a filename/path string in the Write-Only channel, * including the NULL-terminator. */ -#define H5FD_IOC_PATH_MAX 4096 +#define H5FD_IOC_PATH_MAX 4096 #define H5FD_IOC_THREAD_POOL_SIZE 4 /* @@ -40,11 +40,11 @@ * lastly, defining a fixed number. */ typedef enum { - SELECT_IOC_ONE_PER_NODE = 0, /* Default */ - SELECT_IOC_EVERY_NTH_RANK, /* Starting at rank 0, select-next += N */ - SELECT_IOC_WITH_CONFIG, /* NOT IMPLEMENTED: Read-from-file */ - SELECT_IOC_TOTAL, /* Starting at rank 0, mpi_size / total */ - ioc_selection_options /* (Uses same selection as every Nth rank) */ + SELECT_IOC_ONE_PER_NODE = 0, /* Default */ + SELECT_IOC_EVERY_NTH_RANK, /* Starting at rank 0, select-next += N */ + SELECT_IOC_WITH_CONFIG, /* NOT IMPLEMENTED: Read-from-file */ + SELECT_IOC_TOTAL, /* Starting at rank 0, mpi_size / total */ + ioc_selection_options /* (Uses same selection as every Nth rank) */ } ioc_selection_t; /* @@ -60,40 +60,81 @@ typedef enum { #define H5FD_SUBFILING_PATH_MAX 4096 typedef struct config_common_t { - uint32_t magic; /* set to H5FD_SUBFILING_FAPL_T_MAGIC */ - uint32_t version; /* set to H5FD_CURR_SUBFILING_FAPL_T_VERSION */ - int32_t stripe_count; /* How many io concentrators */ - int64_t stripe_depth; /* Max # of bytes in contigious IO to an IOC */ - ioc_selection_t ioc_selection; /* Method to select IO Concentrators */ - hid_t ioc_fapl_id; /* The hid_t value of the stacked VFD */ - int64_t context_id; /* The value used to lookup an IOC context */ - char file_dir[H5FD_SUBFILING_PATH_MAX + 1]; /* Directory where we find files */ - char file_path[H5FD_SUBFILING_PATH_MAX + 1]; /* The user define filename */ + uint32_t magic; /* set to H5FD_SUBFILING_FAPL_T_MAGIC */ + uint32_t version; /* set to H5FD_CURR_SUBFILING_FAPL_T_VERSION */ + int32_t stripe_count; /* How many io concentrators */ + int64_t stripe_depth; /* Max # of bytes in contigious IO to an IOC */ + ioc_selection_t ioc_selection; /* Method to select IO Concentrators */ + hid_t ioc_fapl_id; /* The hid_t value of the stacked VFD */ + int64_t context_id; /* The value used to lookup an IOC context */ + char file_dir[H5FD_SUBFILING_PATH_MAX + 1]; /* Directory where we find files */ + char file_path[H5FD_SUBFILING_PATH_MAX + 1]; /* The user defined filename */ } config_common_t; - typedef struct H5FD_ioc_config_t { config_common_t common; int32_t thread_pool_count; } H5FD_ioc_config_t; +/* The information of this ioc */ +typedef struct H5FD_ioc_t { + H5FD_t pub; /* public stuff, must be first */ + int fd; /* the filesystem file descriptor */ + + H5FD_ioc_config_t fa; /* driver-specific file access properties */ + int mpi_rank; + int mpi_size; + H5FD_t * ioc_file; /* native HDF5 file pointer (sec2) */ + +#ifndef H5_HAVE_WIN32_API + /* On most systems the combination of device and i-node number uniquely + * identify a file. Note that Cygwin, MinGW and other Windows POSIX + * environments have the stat function (which fakes inodes) + * and will use the 'device + inodes' scheme as opposed to the + * Windows code further below. + */ + dev_t device; /* file device number */ + ino_t inode; /* file i-node number */ +#else + /* Files in windows are uniquely identified by the volume serial + * number and the file index (both low and high parts). + * + * There are caveats where these numbers can change, especially + * on FAT file systems. On NTFS, however, a file should keep + * those numbers the same until renamed or deleted (though you + * can use ReplaceFile() on NTFS to keep the numbers the same + * while renaming). + * + * See the MSDN "BY_HANDLE_FILE_INFORMATION Structure" entry for + * more information. + * + * http://msdn.microsoft.com/en-us/library/aa363788(v=VS.85).aspx + */ + DWORD nFileIndexLow; + DWORD nFileIndexHigh; + DWORD dwVolumeSerialNumber; + + HANDLE hFile; /* Native windows file handle */ +#endif /* H5_HAVE_WIN32_API */ + int hdf_fd_dup; +} H5FD_ioc_t; + #ifdef __cplusplus extern "C" { #endif -H5_DLL hid_t H5FD_ioc_init(void); +H5_DLL hid_t H5FD_ioc_init(void); H5_DLL herr_t H5Pset_fapl_ioc(hid_t fapl_id, H5FD_ioc_config_t *config_ptr); H5_DLL herr_t H5Pget_fapl_ioc(hid_t fapl_id, H5FD_ioc_config_t *config_ptr); -H5_DLL void H5FD_ioc_set_shutdown_flag(int flag); -H5_DLL void H5FD_ioc_wait_thread_main(void); -H5_DLL void H5FD_ioc_finalize_threads(void); -H5_DLL int initialize_ioc_threads(void *_sf_context); -H5_DLL int tpool_add_work(void *work); -H5_DLL void begin_thread_exclusive(void); -H5_DLL void end_thread_exclusive(void); +H5_DLL void H5FD_ioc_set_shutdown_flag(int flag); +H5_DLL void H5FD_ioc_wait_thread_main(void); +H5_DLL void H5FD_ioc_finalize_threads(void); +H5_DLL int initialize_ioc_threads(void *_sf_context); +H5_DLL int tpool_add_work(void *work); +H5_DLL void begin_thread_exclusive(void); +H5_DLL void end_thread_exclusive(void); #ifdef __cplusplus } #endif #endif - diff --git a/src/H5FDioc_threads.c b/src/H5FDioc_threads.c index 6a4536a..1db8c62 100644 --- a/src/H5FDioc_threads.c +++ b/src/H5FDioc_threads.c @@ -26,10 +26,10 @@ * use mercury for that purpose... */ -static hg_thread_mutex_t ioc_mutex = PTHREAD_MUTEX_INITIALIZER; +static hg_thread_mutex_t ioc_mutex = PTHREAD_MUTEX_INITIALIZER; static hg_thread_mutex_t ioc_thread_mutex = PTHREAD_MUTEX_INITIALIZER; -static hg_thread_pool_t *ioc_thread_pool = NULL; -static hg_thread_t ioc_thread; +static hg_thread_pool_t *ioc_thread_pool = NULL; +static hg_thread_t ioc_thread; #ifndef HG_TEST_NUM_THREADS_DEFAULT #define HG_TEST_NUM_THREADS_DEFAULT 4 @@ -37,8 +37,8 @@ static hg_thread_t ioc_thread; extern int ioc_main(int64_t context_id); -static int pool_concurrent_max = 0; -static struct hg_thread_work *pool_request = NULL; +static int pool_concurrent_max = 0; +static struct hg_thread_work *pool_request = NULL; /*------------------------------------------------------------------------- * Function: local ioc_thread_main @@ -58,16 +58,18 @@ static struct hg_thread_work *pool_request = NULL; * *------------------------------------------------------------------------- */ -static HG_THREAD_RETURN_TYPE ioc_thread_main(void *arg) { - int64_t *context_id = (int64_t *)arg; - hg_thread_ret_t thread_ret = (hg_thread_ret_t)0; +static HG_THREAD_RETURN_TYPE +ioc_thread_main(void *arg) +{ + int64_t * context_id = (int64_t *)arg; + hg_thread_ret_t thread_ret = (hg_thread_ret_t)0; - /* Pass along the subfiling_context_t */ - ioc_main(context_id[0]); + /* Pass along the subfiling_context_t */ + ioc_main(context_id[0]); - /* Upon exit, we can free the input arg */ - free(arg); - return thread_ret; + /* Upon exit, we can free the input arg */ + free(arg); + return thread_ret; } /*------------------------------------------------------------------------- @@ -90,100 +92,103 @@ static HG_THREAD_RETURN_TYPE ioc_thread_main(void *arg) { * *------------------------------------------------------------------------- */ -int initialize_ioc_threads(void *_sf_context) { - int status; - int file_open_count; - subfiling_context_t *sf_context = _sf_context; - unsigned int thread_pool_count = HG_TEST_NUM_THREADS_DEFAULT; - int64_t *context_id = (int64_t *)malloc(sizeof(int64_t)); - int world_size = sf_context->topology->app_layout->world_size; - size_t alloc_size = ((size_t)world_size * sizeof(struct hg_thread_work)); - char *envValue; - double t_start = 0.0, t_end = 0.0; - - assert(context_id != NULL); - - file_open_count = atomic_load(&sf_file_open_count); - atomic_fetch_add(&sf_file_open_count, 1); - - if (file_open_count > 0) - return 0; +int +initialize_ioc_threads(void *_sf_context) +{ + int status; + int file_open_count; + subfiling_context_t *sf_context = _sf_context; + unsigned int thread_pool_count = HG_TEST_NUM_THREADS_DEFAULT; + int64_t * context_id = (int64_t *)malloc(sizeof(int64_t)); + int world_size = sf_context->topology->app_layout->world_size; + size_t alloc_size = ((size_t)world_size * sizeof(struct hg_thread_work)); + char * envValue; + double t_start = 0.0, t_end = 0.0; + + assert(context_id != NULL); + + file_open_count = atomic_load(&sf_file_open_count); + atomic_fetch_add(&sf_file_open_count, 1); + + if (file_open_count > 0) + return 0; + + t_start = MPI_Wtime(); + + /* Initialize the main IOC thread input argument. + * Each IOC request will utilize this context_id which is + * consistent across all MPI ranks, to ensure that requests + * involving reference counting are correctly using the + * correct file contexts. + */ + context_id[0] = sf_context->sf_context_id; + + if (pool_request == NULL) { + if ((pool_request = (struct hg_thread_work *)malloc(alloc_size)) == NULL) { + perror("malloc error"); + return -1; + } + else + pool_concurrent_max = world_size; + } + + memset(pool_request, 0, alloc_size); + + /* Initialize a couple of mutex variables that are used + * during IO concentrator operations to serialize + * access to key objects, e.g. reference counting. + */ + status = hg_thread_mutex_init(&ioc_mutex); + if (status) { + puts("hg_thread_mutex_init failed"); + goto err_exit; + } + status = hg_thread_mutex_init(&ioc_thread_mutex); + if (status) { + puts("hg_thread_mutex_init failed"); + goto err_exit; + } - t_start = MPI_Wtime(); - - /* Initialize the main IOC thread input argument. - * Each IOC request will utilize this context_id which is - * consistent across all MPI ranks, to ensure that requests - * involving reference counting are correctly using the - * correct file contexts. - */ - context_id[0] = sf_context->sf_context_id; - - if (pool_request == NULL) { - if ((pool_request = (struct hg_thread_work *)malloc(alloc_size)) == NULL) { - perror("malloc error"); - return -1; - } else - pool_concurrent_max = world_size; - } - - memset(pool_request, 0, alloc_size); - - /* Initialize a couple of mutex variables that are used - * during IO concentrator operations to serialize - * access to key objects, e.g. reference counting. - */ - status = hg_thread_mutex_init(&ioc_mutex); - if (status) { - puts("hg_thread_mutex_init failed"); - goto err_exit; - } - status = hg_thread_mutex_init(&ioc_thread_mutex); - if (status) { - puts("hg_thread_mutex_init failed"); - goto err_exit; - } - - /* Allow experimentation with the number of helper threads */ - if ((envValue = getenv("IOC_THREAD_POOL_COUNT")) != NULL) { - int value_check = atoi(envValue); - if (value_check > 0) { - thread_pool_count = (unsigned int)value_check; + /* Allow experimentation with the number of helper threads */ + if ((envValue = getenv("IOC_THREAD_POOL_COUNT")) != NULL) { + int value_check = atoi(envValue); + if (value_check > 0) { + thread_pool_count = (unsigned int)value_check; + } + } + + /* Initialize a thread pool for the IO Concentrator to use */ + status = hg_thread_pool_init(thread_pool_count, &ioc_thread_pool); + if (status) { + puts("hg_thread_pool_init failed"); + goto err_exit; + } + + /* Arguments to hg_thread_create are: + * 1. A pointer to reference the created thread. + * 2. User function pointer for the new thread to execute. + * 3. Pointer to the input argument that gets passed along to the user + * function. + */ + status = hg_thread_create(&ioc_thread, ioc_thread_main, context_id); + if (status) { + puts("hg_thread_create failed"); + goto err_exit; } - } - - /* Initialize a thread pool for the IO Concentrator to use */ - status = hg_thread_pool_init(thread_pool_count, &ioc_thread_pool); - if (status) { - puts("hg_thread_pool_init failed"); - goto err_exit; - } - - /* Arguments to hg_thread_create are: - * 1. A pointer to reference the created thread. - * 2. User function pointer for the new thread to execute. - * 3. Pointer to the input argument that gets passed along to the user - * function. - */ - status = hg_thread_create(&ioc_thread, ioc_thread_main, context_id); - if (status) { - puts("hg_thread_create failed"); - goto err_exit; - } #ifndef NDEBUG - t_end = MPI_Wtime(); - if (sf_verbose_flag) { - if (sf_context->topology->subfile_rank == 0) { - HDprintf("%s: time = %lf seconds\n", __func__, (t_end - t_start)); - HDfflush(stdout); + t_end = MPI_Wtime(); + if (sf_verbose_flag) { + if (sf_context->topology->subfile_rank == 0) { + HDprintf("%s: time = %lf seconds\n", __func__, (t_end - t_start)); + HDfflush(stdout); + } } - } #endif - return 0; + return 0; err_exit: - return -1; + return -1; } /*------------------------------------------------------------------------- @@ -202,30 +207,38 @@ err_exit: * *------------------------------------------------------------------------- */ -void __attribute__((destructor)) finalize_ioc_threads(void) { - if (ioc_thread_pool != NULL) { - hg_thread_pool_destroy(ioc_thread_pool); - ioc_thread_pool = NULL; - } +void __attribute__((destructor)) finalize_ioc_threads(void) +{ + if (ioc_thread_pool != NULL) { + hg_thread_pool_destroy(ioc_thread_pool); + ioc_thread_pool = NULL; + } } -static const char *translate_opcode(io_op_t op) +static const char * +translate_opcode(io_op_t op) { - switch(op) { - case READ_OP: return "READ_OP"; - break; - case WRITE_OP: return "WRITE_OP"; - break; - case OPEN_OP: return "OPEN_OP"; - break; - case CLOSE_OP: return "CLOSE_OP"; - break; - case FINI_OP: return "FINI_OP"; - break; - case LOGGING_OP: return "LOGGING_OP"; - break; - } - return "unknown"; + switch (op) { + case READ_OP: + return "READ_OP"; + break; + case WRITE_OP: + return "WRITE_OP"; + break; + case OPEN_OP: + return "OPEN_OP"; + break; + case CLOSE_OP: + return "CLOSE_OP"; + break; + case FINI_OP: + return "FINI_OP"; + break; + case LOGGING_OP: + return "LOGGING_OP"; + break; + } + return "unknown"; } /*------------------------------------------------------------------------- * Function: local: handle_work_request @@ -249,45 +262,44 @@ static const char *translate_opcode(io_op_t op) * *------------------------------------------------------------------------- */ -static HG_THREAD_RETURN_TYPE handle_work_request(void *arg) +static HG_THREAD_RETURN_TYPE +handle_work_request(void *arg) { - int status = 0; - hg_thread_ret_t ret = 0; - sf_work_request_t *msg = (sf_work_request_t *)arg; - int64_t file_context_id = msg->header[2]; - subfiling_context_t *sf_context = NULL; - - sf_context = get__subfiling_object(file_context_id); - assert(sf_context != NULL); - - atomic_fetch_add(&sf_work_pending, 1); // atomic - switch (msg->tag) { - case WRITE_INDEP: - status = queue_write_indep(msg, msg->subfile_rank, msg->source, - sf_context->sf_data_comm); - break; - case READ_INDEP: - status = queue_read_indep(msg, msg->subfile_rank, msg->source, - sf_context->sf_data_comm); - break; - default: - HDprintf("[ioc(%d)] received message tag(%x)from rank %d\n", - msg->subfile_rank, msg->tag, msg->source); - status = -1; - break; - } - fflush(stdout); - - atomic_fetch_sub(&sf_work_pending, 1); // atomic - if (status < 0) { - HDprintf("[ioc(%d) %s]: request(%s) filename=%s from " - "rank(%d), size=%ld, offset=%ld FAILED\n", - msg->subfile_rank, __func__, translate_opcode((io_op_t)msg->tag), - sf_context->filename, msg->source, msg->header[0], msg->header[1]); - + int status = 0; + hg_thread_ret_t ret = 0; + sf_work_request_t * msg = (sf_work_request_t *)arg; + int64_t file_context_id = msg->header[2]; + subfiling_context_t *sf_context = NULL; + + sf_context = get__subfiling_object(file_context_id); + assert(sf_context != NULL); + + atomic_fetch_add(&sf_work_pending, 1); // atomic + switch (msg->tag) { + case WRITE_INDEP: + status = queue_write_indep(msg, msg->subfile_rank, msg->source, sf_context->sf_data_comm); + break; + case READ_INDEP: + status = queue_read_indep(msg, msg->subfile_rank, msg->source, sf_context->sf_data_comm); + break; + default: + HDprintf("[ioc(%d)] received message tag(%x)from rank %d\n", msg->subfile_rank, msg->tag, + msg->source); + status = -1; + break; + } fflush(stdout); - } - return ret; + + atomic_fetch_sub(&sf_work_pending, 1); // atomic + if (status < 0) { + HDprintf("[ioc(%d) %s]: request(%s) filename=%s from " + "rank(%d), size=%ld, offset=%ld FAILED\n", + msg->subfile_rank, __func__, translate_opcode((io_op_t)msg->tag), sf_context->sf_filename, + msg->source, msg->header[0], msg->header[1]); + + fflush(stdout); + } + return ret; } /*------------------------------------------------------------------------- @@ -306,18 +318,20 @@ static HG_THREAD_RETURN_TYPE handle_work_request(void *arg) * *------------------------------------------------------------------------- */ -int tpool_add_work(void *_work) { - static int work_index = 0; - sf_work_request_t *work = (sf_work_request_t *)_work; - - hg_thread_mutex_lock(&ioc_mutex); - if (work_index == pool_concurrent_max) - work_index = 0; - pool_request[work_index].func = handle_work_request; - pool_request[work_index].args = work; - hg_thread_pool_post(ioc_thread_pool, &pool_request[work_index++]); - hg_thread_mutex_unlock(&ioc_mutex); - return 0; +int +tpool_add_work(void *_work) +{ + static int work_index = 0; + sf_work_request_t *work = (sf_work_request_t *)_work; + + hg_thread_mutex_lock(&ioc_mutex); + if (work_index == pool_concurrent_max) + work_index = 0; + pool_request[work_index].func = handle_work_request; + pool_request[work_index].args = work; + hg_thread_pool_post(ioc_thread_pool, &pool_request[work_index++]); + hg_thread_mutex_unlock(&ioc_mutex); + return 0; } /*------------------------------------------------------------------------- @@ -335,7 +349,11 @@ int tpool_add_work(void *_work) { * *------------------------------------------------------------------------- */ -bool tpool_is_empty(void) { return HG_QUEUE_IS_EMPTY(&ioc_thread_pool->queue); } +bool +tpool_is_empty(void) +{ + return HG_QUEUE_IS_EMPTY(&ioc_thread_pool->queue); +} /*------------------------------------------------------------------------- * Function: begin_thread_exclusive @@ -351,8 +369,10 @@ bool tpool_is_empty(void) { return HG_QUEUE_IS_EMPTY(&ioc_thread_pool->queue); } * *------------------------------------------------------------------------- */ -void begin_thread_exclusive(void) { - hg_thread_mutex_lock(&ioc_thread_mutex); +void +begin_thread_exclusive(void) +{ + hg_thread_mutex_lock(&ioc_thread_mutex); } /*------------------------------------------------------------------------- @@ -370,8 +390,10 @@ void begin_thread_exclusive(void) { * *------------------------------------------------------------------------- */ -void end_thread_exclusive(void) { - hg_thread_mutex_unlock(&ioc_thread_mutex); +void +end_thread_exclusive(void) +{ + hg_thread_mutex_unlock(&ioc_thread_mutex); } /*------------------------------------------------------------------------- @@ -389,9 +411,11 @@ void end_thread_exclusive(void) { * *------------------------------------------------------------------------- */ -int wait_for_thread_main(void) { - if (hg_thread_join(ioc_thread) != 0) { - return -1; - } - return 0; +int +wait_for_thread_main(void) +{ + if (hg_thread_join(ioc_thread) != 0) { + return -1; + } + return 0; } diff --git a/src/H5FDsubfile_int.c b/src/H5FDsubfile_int.c index 8abda5e..b99ea36 100644 --- a/src/H5FDsubfile_int.c +++ b/src/H5FDsubfile_int.c @@ -39,7 +39,6 @@ Private functions ========================================= */ - /* -------------------------------------------------------------------------- sf_context_limit -- How many contexts can be recorded (default = 4) @@ -48,11 +47,11 @@ sf_context_cache -- Storage for contexts -------------------------------------------------------------------------- */ // static size_t twoGIG_LIMIT = (1 << 30); -static size_t sf_context_limit = 16; -static subfiling_context_t *sf_context_cache = NULL; -static size_t sf_topology_limit = 4; -static sf_topology_t *sf_topology_cache = NULL; -static app_layout_t *sf_app_layout = NULL; +static size_t sf_context_limit = 16; +static subfiling_context_t *sf_context_cache = NULL; +static size_t sf_topology_limit = 4; +static sf_topology_t * sf_topology_cache = NULL; +static app_layout_t * sf_app_layout = NULL; static file_map_to_context_t *sf_open_file_map = NULL; static int sf_file_map_size = 0; @@ -64,19 +63,18 @@ static int sf_file_map_size = 0; --------------------------------------- */ static stat_record_t subfiling_stats[TOTAL_STAT_COUNT]; -#define SF_WRITE_OPS (subfiling_stats[WRITE_STAT].op_count) -#define SF_WRITE_TIME (subfiling_stats[WRITE_STAT].total/(double)subfiling_stats[WRITE_STAT].op_count) -#define SF_WRITE_WAIT_TIME (subfiling_stats[WRITE_WAIT].total/(double)subfiling_stats[WRITE_WAIT].op_count) -#define SF_READ_OPS (subfiling_stats[READ_STAT].op_count) -#define SF_READ_TIME (subfiling_stats[READ_STAT].total/(double)subfiling_stats[READ_STAT].op_count) -#define SF_READ_WAIT_TIME (subfiling_stats[READ_WAIT].total/(double)subfiling_stats[READ_WAIT].op_count) -#define SF_QUEUE_DELAYS (subfiling_stats[QUEUE_STAT].total) - +#define SF_WRITE_OPS (subfiling_stats[WRITE_STAT].op_count) +#define SF_WRITE_TIME (subfiling_stats[WRITE_STAT].total / (double)subfiling_stats[WRITE_STAT].op_count) +#define SF_WRITE_WAIT_TIME (subfiling_stats[WRITE_WAIT].total / (double)subfiling_stats[WRITE_WAIT].op_count) +#define SF_READ_OPS (subfiling_stats[READ_STAT].op_count) +#define SF_READ_TIME (subfiling_stats[READ_STAT].total / (double)subfiling_stats[READ_STAT].op_count) +#define SF_READ_WAIT_TIME (subfiling_stats[READ_WAIT].total / (double)subfiling_stats[READ_WAIT].op_count) +#define SF_QUEUE_DELAYS (subfiling_stats[QUEUE_STAT].total) static void maybe_initialize_statistics(void) { - memset(subfiling_stats, 0, sizeof(subfiling_stats)); + memset(subfiling_stats, 0, sizeof(subfiling_stats)); } static void clear_fid_map_entry(uint64_t sf_fid); @@ -87,9 +85,6 @@ Public functions ========================================= */ - - - /* ------------------------------------------------------------------------- Programmer: Richard Warren @@ -118,7 +113,7 @@ Public functions void * get__subfiling_object(int64_t object_id) { - int obj_type = (int) ((object_id >> 32) & 0x0FFFF); + int obj_type = (int)((object_id >> 32) & 0x0FFFF); /* We don't require a large indexing space * 16 bits should be enough.. */ @@ -130,16 +125,17 @@ get__subfiling_object(int64_t object_id) * nodes along with the number of MPI ranks on a node. */ if (sf_topology_cache == NULL) { - sf_topology_cache = (sf_topology_t *) calloc( - sf_topology_limit, sizeof(sf_topology_t)); + sf_topology_cache = (sf_topology_t *)calloc(sf_topology_limit, sizeof(sf_topology_t)); assert(sf_topology_cache != NULL); } if (index < sf_topology_limit) { - return (void *) &sf_topology_cache[index]; - } else { + return (void *)&sf_topology_cache[index]; + } + else { HDputs("Illegal toplogy object index"); } - } else if (obj_type == SF_CONTEXT) { + } + else if (obj_type == SF_CONTEXT) { /* Contexts provide information principally about * the application and how the data layout is managed * over some number of sub-files. The important @@ -150,27 +146,25 @@ get__subfiling_object(int64_t object_id) * to facilitate the communication of IO requests. */ if (sf_context_cache == NULL) { - sf_context_cache = (subfiling_context_t *) calloc( - sf_context_limit, sizeof(subfiling_context_t)); + sf_context_cache = (subfiling_context_t *)calloc(sf_context_limit, sizeof(subfiling_context_t)); assert(sf_context_cache != NULL); } if (index == sf_context_limit) { sf_context_limit *= 2; - sf_context_cache = (subfiling_context_t *) realloc(sf_context_cache, - sf_context_limit * sizeof(subfiling_context_t)); + sf_context_cache = (subfiling_context_t *)realloc(sf_context_cache, + sf_context_limit * sizeof(subfiling_context_t)); assert(sf_context_cache != NULL); - } else { - return (void *) &sf_context_cache[index]; } - } else { - printf( - "get__subfiling_object: UNKNOWN Subfiling object type id = 0x%lx\n", - object_id); + else { + return (void *)&sf_context_cache[index]; + } + } + else { + printf("get__subfiling_object: UNKNOWN Subfiling object type id = 0x%lx\n", object_id); } return NULL; } /* end get__subfiling_object() */ - /*------------------------------------------------------------------------- * Function: UTILITY FUNCTIONS: * delete_subfiling_context - removes a context entry in the @@ -199,7 +193,7 @@ delete_subfiling_context(hid_t context_id) MPI_Comm_free(&sf_context->sf_intercomm); } } - free(sf_context); + /* free(sf_context); */ } return; @@ -211,7 +205,7 @@ Public vars (for subfiling) and functions We probably need a function to set and clear this ====================================================== */ -int sf_verbose_flag = 0; +int sf_verbose_flag = 0; int sf_open_file_count = 0; /*------------------------------------------------------------------------- @@ -234,14 +228,16 @@ void set_verbose_flag(int subfile_rank, int new_value) { #ifndef NDEBUG - sf_verbose_flag = (int) (new_value & 0x0FF); + sf_verbose_flag = (int)(new_value & 0x0FF); if (sf_verbose_flag) { char logname[64]; sprintf(logname, "ioc_%d.log", subfile_rank); if (sf_open_file_count > 1) sf_logfile = fopen(logname, "a+"); - else sf_logfile = fopen(logname, "w+"); - } else if (sf_logfile) { + else + sf_logfile = fopen(logname, "w+"); + } + else if (sf_logfile) { fclose(sf_logfile); sf_logfile = NULL; } @@ -287,21 +283,21 @@ record_fid_to_subfile(uint64_t fid, hid_t subfile_context_id, int *next_index) int index; if (sf_file_map_size == 0) { int i; - sf_open_file_map = (file_map_to_context_t *) malloc( - (size_t) DEFAULT_MAP_ENTRIES * sizeof(file_map_to_context_t)); + sf_open_file_map = + (file_map_to_context_t *)malloc((size_t)DEFAULT_MAP_ENTRIES * sizeof(file_map_to_context_t)); if (sf_open_file_map == NULL) { perror("malloc"); return FAIL; } sf_file_map_size = DEFAULT_MAP_ENTRIES; for (i = 0; i < sf_file_map_size; i++) { - sf_open_file_map[i].h5_file_id = (uint64_t)H5I_INVALID_HID; + sf_open_file_map[i].h5_file_id = (uint64_t)H5I_INVALID_HID; sf_open_file_map[i].sf_context_id = 0; } } for (index = 0; index < sf_file_map_size; index++) { if (sf_open_file_map[index].h5_file_id == (uint64_t)H5I_INVALID_HID) { - sf_open_file_map[index].h5_file_id = fid; + sf_open_file_map[index].h5_file_id = fid; sf_open_file_map[index].sf_context_id = subfile_context_id; if (next_index) { @@ -312,8 +308,8 @@ record_fid_to_subfile(uint64_t fid, hid_t subfile_context_id, int *next_index) } if (index == sf_file_map_size) { int i; - sf_open_file_map = realloc(sf_open_file_map, - ((size_t)(sf_file_map_size * 2) * sizeof(file_map_to_context_t))); + sf_open_file_map = + realloc(sf_open_file_map, ((size_t)(sf_file_map_size * 2) * sizeof(file_map_to_context_t))); if (sf_open_file_map == NULL) { perror("realloc"); return FAIL; @@ -327,7 +323,7 @@ record_fid_to_subfile(uint64_t fid, hid_t subfile_context_id, int *next_index) *next_index = index; } - sf_open_file_map[index].h5_file_id = fid; + sf_open_file_map[index].h5_file_id = fid; sf_open_file_map[index++].sf_context_id = subfile_context_id; } return status; @@ -370,16 +366,15 @@ record_fid_to_subfile(uint64_t fid, hid_t subfile_context_id, int *next_index) int open_subfile_with_context(subfiling_context_t *sf_context, uint64_t fid, int flags) { - int ret; - int g_errors = 0; - int l_errors = 0; - double start_t = MPI_Wtime(); + int ret; + int g_errors = 0; + int l_errors = 0; + double start_t = MPI_Wtime(); assert(sf_context != NULL); #ifdef VERBOSE - printf("[%s %d]: context_id=%ld\n", __func__, - sf_context->topology->app_layout->world_rank, - sf_context->sf_context_id); + printf("[%s %d]: context_id=%ld\n", __func__, sf_context->topology->app_layout->world_rank, + sf_context->sf_context_id); #endif /* @@ -391,18 +386,22 @@ open_subfile_with_context(subfiling_context_t *sf_context, uint64_t fid, int fla ret = record_fid_to_subfile(fid, sf_context->sf_context_id, NULL); if (ret != SUCCEED) { printf("[%d - %s] Error mapping hdf5 file to a subfiling context\n", - sf_context->topology->app_layout->world_rank, __func__); + sf_context->topology->app_layout->world_rank, __func__); return -1; } if (sf_context->topology->rank_is_ioc) { - sf_work_request_t msg = {{flags,(int64_t)fid, sf_context->sf_context_id}, - OPEN_OP, sf_context->topology->app_layout->world_rank, + sf_work_request_t msg = {{flags, (int64_t)fid, sf_context->sf_context_id}, + OPEN_OP, + sf_context->topology->app_layout->world_rank, sf_context->topology->subfile_rank, - sf_context->sf_context_id, start_t, NULL, 0}; + sf_context->sf_context_id, + start_t, + NULL, + 0}; if (flags & O_CREAT) { - sf_context->sf_fid = -2; + sf_context->sf_fid = -2; } l_errors = subfiling_open_file(&msg, sf_context->topology->subfile_rank, flags); @@ -445,17 +444,34 @@ open_subfile_with_context(subfiling_context_t *sf_context, uint64_t fid, int fla *------------------------------------------------------------------------- */ static int -close__subfiles( subfiling_context_t *sf_context, uint64_t fid) +close__subfiles(subfiling_context_t *sf_context, uint64_t fid) { - int global_errors = 0, errors = 0; - int file_open_count; - int subfile_fid = 0; - double t0 = 0.0, t1 = 0.0, t2 = 0.0; - double t_main_exit = 0.0, t_finalize_threads = 0.0; + int global_errors = 0, errors = 0; + int file_open_count; + int subfile_fid = 0; + double t0 = 0.0, t1 = 0.0, t2 = 0.0; + double t_main_exit = 0.0, t_finalize_threads = 0.0; HDassert((sf_context != NULL)); t0 = MPI_Wtime(); +#if MPI_VERSION >= 3 && MPI_SUBVERSION >= 1 + MPI_Request b_req = MPI_REQUEST_NULL; + int mpi_status = MPI_Ibarrier(MPI_COMM_WORLD, &b_req); + if (mpi_status == MPI_SUCCESS) { + int completed = 0; + while (!completed) { + useconds_t t_delay = 5; + usleep(t_delay); + mpi_status = MPI_Test(&b_req, &completed, MPI_STATUS_IGNORE); + if (mpi_status != MPI_SUCCESS) + completed = 1; + } + } +#else + MPI_Barrier(MPI_COMM_WORLD); +#endif + /* We make the subfile close operation collective. * Otherwise, there may be a race condition between * our closing the subfiles and the user application @@ -482,67 +498,73 @@ close__subfiles( subfiling_context_t *sf_context, uint64_t fid) * as part of the file close. */ if (file_open_count == 1) { - /* Shutdown the main IOC thread */ - H5FD_ioc_set_shutdown_flag(1); - /* Allow ioc_main to exit.*/ - usleep(20); - - t1 = MPI_Wtime(); - H5FD_ioc_wait_thread_main(); - t2 = MPI_Wtime(); - t1 = t2; - t_main_exit = t2-t1; - H5FD_ioc_finalize_threads(); - t2 = MPI_Wtime(); + /* Shutdown the main IOC thread */ + H5FD_ioc_set_shutdown_flag(1); + /* Allow ioc_main to exit.*/ + usleep(20); + + t1 = MPI_Wtime(); + H5FD_ioc_wait_thread_main(); + t2 = MPI_Wtime(); + t1 = t2; + t_main_exit = t2 - t1; + H5FD_ioc_finalize_threads(); + t2 = MPI_Wtime(); } - t_finalize_threads = t2-t1; + t_finalize_threads = t2 - t1; if ((subfile_fid = sf_context->sf_fid) > 0) { - if (HDclose(subfile_fid) < 0) + if (HDclose(subfile_fid) < 0) { + perror("close(subfile_fid)"); errors++; - else { - sf_context->sf_fid = -1; - } + } + else { + sf_context->sf_fid = -1; + } } #ifndef NDEBUG - /* FIXME: If we've had multiple files open, our statistics - * will be messed up! + /* FIXME: If we've had multiple files open, our statistics + * will be messed up! */ if (sf_verbose_flag) { t1 = t2; if (sf_logfile != NULL) { - fprintf(sf_logfile, "[%d] main_exit=%lf, finalize_threads=%lf\n", - sf_context->sf_group_rank, t_main_exit, t_finalize_threads); - if (SF_WRITE_OPS > 0) - fprintf(sf_logfile, "[%d] pwrite perf: wrt_ops=%ld wait=%lf pwrite=%lf IOC_shutdown = %lf seconds\n", - sf_context->sf_group_rank, SF_WRITE_OPS, SF_WRITE_WAIT_TIME, SF_WRITE_TIME, (t1 - t0)); - if (SF_READ_OPS > 0) - fprintf(sf_logfile, "[%d] pread perf: read_ops=%ld wait=%lf pread=%lf IOC_shutdown = %lf seconds\n", - sf_context->sf_group_rank, SF_READ_OPS, SF_READ_WAIT_TIME, SF_READ_TIME, (t1 - t0)); - - fprintf(sf_logfile,"[%d] Avg queue time=%lf seconds\n", sf_context->sf_group_rank, - SF_QUEUE_DELAYS/(double)(SF_WRITE_OPS + SF_READ_OPS)); - - fflush(sf_logfile); - - fclose(sf_logfile); - sf_logfile = NULL; + fprintf(sf_logfile, "[%d] main_exit=%lf, finalize_threads=%lf\n", sf_context->sf_group_rank, + t_main_exit, t_finalize_threads); + if (SF_WRITE_OPS > 0) + fprintf(sf_logfile, + "[%d] pwrite perf: wrt_ops=%ld wait=%lf pwrite=%lf IOC_shutdown = %lf seconds\n", + sf_context->sf_group_rank, SF_WRITE_OPS, SF_WRITE_WAIT_TIME, SF_WRITE_TIME, + (t1 - t0)); + if (SF_READ_OPS > 0) + fprintf(sf_logfile, + "[%d] pread perf: read_ops=%ld wait=%lf pread=%lf IOC_shutdown = %lf seconds\n", + sf_context->sf_group_rank, SF_READ_OPS, SF_READ_WAIT_TIME, SF_READ_TIME, + (t1 - t0)); + + fprintf(sf_logfile, "[%d] Avg queue time=%lf seconds\n", sf_context->sf_group_rank, + SF_QUEUE_DELAYS / (double)(SF_WRITE_OPS + SF_READ_OPS)); + + fflush(sf_logfile); + + fclose(sf_logfile); + sf_logfile = NULL; } } - if (sf_context->filename) { - free(sf_context->filename); - sf_context->filename = NULL; + if (sf_context->h5_filename) { + free(sf_context->h5_filename); + sf_context->h5_filename = NULL; } if (sf_context->subfile_prefix) { - free(sf_context->subfile_prefix); - sf_context->subfile_prefix = NULL; + free(sf_context->subfile_prefix); + sf_context->subfile_prefix = NULL; } #endif } - MPI_Allreduce(&errors, &global_errors, 1, MPI_INT, MPI_SUM, MPI_COMM_WORLD); + MPI_Allreduce(&errors, &global_errors, 1, MPI_INT, MPI_SUM, MPI_COMM_WORLD); #ifndef NDEBUG if (sf_verbose_flag) { @@ -551,9 +573,7 @@ close__subfiles( subfiling_context_t *sf_context, uint64_t fid) client_log = NULL; } } -#endif - - +#endif return global_errors; } /* end close__subfiles() */ @@ -568,83 +588,80 @@ being thread safe. */ int -sf_read_data(int fd, int64_t file_offset, void *data_buffer, int64_t data_size, - int subfile_rank) +sf_read_data(int fd, int64_t file_offset, void *data_buffer, int64_t data_size, int subfile_rank) { - int ret = 0; - int retries = MIN_RETRIES; - useconds_t delay = 100; - ssize_t bytes_read; - ssize_t bytes_remaining = (ssize_t) data_size; - char * this_buffer = data_buffer; + int ret = 0; + int retries = MIN_RETRIES; + useconds_t delay = 100; + ssize_t bytes_read; + ssize_t bytes_remaining = (ssize_t)data_size; + char * this_buffer = data_buffer; while (bytes_remaining) { - if ((bytes_read = (ssize_t) pread( - fd, this_buffer, (size_t) bytes_remaining, file_offset)) < 0) { + if ((bytes_read = (ssize_t)pread(fd, this_buffer, (size_t)bytes_remaining, file_offset)) < 0) { perror("pread failed!"); HDprintf("[ioc(%d) %s] pread(fd, buf, bytes_remaining=%ld, " - "file_offset =%ld)\n", - subfile_rank, __func__, bytes_remaining, file_offset); + "file_offset =%ld)\n", + subfile_rank, __func__, bytes_remaining, file_offset); HDfflush(stdout); return -1; - } else if (bytes_read > 0) { + } + else if (bytes_read > 0) { /* reset retry params */ - retries = MIN_RETRIES; - delay = 100; + retries = MIN_RETRIES; + delay = 100; bytes_remaining -= bytes_read; #ifdef VERBOSE - printf("[ioc(%d) %s]: read %ld bytes, remaining=%ld, file_offset=%ld\n",subfile_rank, - __func__, bytes_read, bytes_remaining, file_offset); + printf("[ioc(%d) %s]: read %ld bytes, remaining=%ld, file_offset=%ld\n", subfile_rank, __func__, + bytes_read, bytes_remaining, file_offset); fflush(stdout); #endif this_buffer += bytes_read; file_offset += bytes_read; - - } else { - if (retries == 0) { + } + else { + if (retries == 0) { #ifdef VERBOSE - printf("[ioc(%d) %s] TIMEOUT: file_offset=%ld, data_size=%ld\n", - subfile_rank, __func__, file_offset, data_size); - printf("[ioc(%d) %s] ERROR! read of 0 bytes == eof!\n", - subfile_rank, __func__); + printf("[ioc(%d) %s] TIMEOUT: file_offset=%ld, data_size=%ld\n", subfile_rank, __func__, + file_offset, data_size); + printf("[ioc(%d) %s] ERROR! read of 0 bytes == eof!\n", subfile_rank, __func__); fflush(stdout); #endif return -2; } - retries--; - usleep(delay); - delay *= 2; + retries--; + usleep(delay); + delay *= 2; } } return ret; } /* end sf_read_data() */ int -sf_write_data(int fd, int64_t file_offset, void *data_buffer, int64_t data_size, - int subfile_rank) +sf_write_data(int fd, int64_t file_offset, void *data_buffer, int64_t data_size, int subfile_rank) { - int ret = 0; - char * this_data = (char *) data_buffer; - ssize_t bytes_remaining = (ssize_t) data_size; - ssize_t written = 0; + int ret = 0; + char * this_data = (char *)data_buffer; + ssize_t bytes_remaining = (ssize_t)data_size; + ssize_t written = 0; while (bytes_remaining) { - if ((written = pwrite( - fd, this_data, (size_t) bytes_remaining, file_offset)) < 0) { + if ((written = pwrite(fd, this_data, (size_t)bytes_remaining, file_offset)) < 0) { struct stat statbuf; perror("pwrite failed!"); fstat(fd, &statbuf); HDprintf("[ioc(%d) %s] pwrite(fd, data, bytes_remaining=%ld, " - "file_offset=%ld), fd=%d, st_size=%ld\n", - subfile_rank, __func__, bytes_remaining, file_offset, fd, statbuf.st_size); + "file_offset=%ld), fd=%d, st_size=%ld\n", + subfile_rank, __func__, bytes_remaining, file_offset, fd, statbuf.st_size); HDfflush(stdout); return -1; - } else { + } + else { bytes_remaining -= written; #ifdef VERBOSE - printf("[ioc(%d) %s]: wrote %ld bytes, remaining=%ld, file_offset=%ld\n",subfile_rank, - __func__, written, bytes_remaining, file_offset); + printf("[ioc(%d) %s]: wrote %ld bytes, remaining=%ld, file_offset=%ld\n", subfile_rank, __func__, + written, bytes_remaining, file_offset); fflush(stdout); #endif this_data += written; @@ -660,7 +677,6 @@ sf_write_data(int fd, int64_t file_offset, void *data_buffer, int64_t data_size, return ret; } /* end sf_write_data() */ - /* * --------------------------------------------------- * Topology discovery related functions for choosing @@ -700,8 +716,8 @@ sf_write_data(int fd, int64_t file_offset, void *data_buffer, int64_t data_size, static int compare_hostid(const void *h1, const void *h2) { - const layout_t *host1 = (const layout_t *) h1; - const layout_t *host2 = (const layout_t *) h2; + const layout_t *host1 = (const layout_t *)h1; + const layout_t *host2 = (const layout_t *)h2; return (host1->hostid > host2->hostid); } @@ -725,8 +741,8 @@ compare_hostid(const void *h1, const void *h2) static void gather_topology_info(sf_topology_t *info) { - int sf_world_size; - int sf_world_rank; + int sf_world_size; + int sf_world_rank; app_layout_t *app_layout = NULL; HDassert(info != NULL); @@ -739,24 +755,22 @@ gather_topology_info(sf_topology_t *info) return; if (1) { - long hostid = gethostid(); - layout_t my_hostinfo; - app_layout->layout = - (layout_t *) calloc((size_t) sf_world_size + 1, sizeof(layout_t)); + long hostid = gethostid(); + layout_t my_hostinfo; + app_layout->layout = (layout_t *)calloc((size_t)sf_world_size + 1, sizeof(layout_t)); if (app_layout->layout == NULL) { perror("calloc failure!"); MPI_Abort(MPI_COMM_WORLD, 1); } app_layout->hostid = hostid; - my_hostinfo.rank = sf_world_rank; - my_hostinfo.hostid = hostid; + my_hostinfo.rank = sf_world_rank; + my_hostinfo.hostid = hostid; app_layout->layout[sf_world_rank] = my_hostinfo; if (sf_world_size > 1) { - if (MPI_Allgather(&my_hostinfo, 2, MPI_LONG, app_layout->layout, 2, - MPI_LONG, MPI_COMM_WORLD) == MPI_SUCCESS) { - qsort(app_layout->layout, (size_t) sf_world_size, sizeof(layout_t), - compare_hostid); + if (MPI_Allgather(&my_hostinfo, 2, MPI_LONG, app_layout->layout, 2, MPI_LONG, MPI_COMM_WORLD) == + MPI_SUCCESS) { + qsort(app_layout->layout, (size_t)sf_world_size, sizeof(layout_t), compare_hostid); } } } @@ -785,9 +799,9 @@ gather_topology_info(sf_topology_t *info) static int count_nodes(sf_topology_t *info, int my_rank) { - int k, node_count, hostid_index = -1; + int k, node_count, hostid_index = -1; app_layout_t *app_layout = NULL; - long nextid; + long nextid; HDassert(info != NULL); HDassert((app_layout = info->app_layout) != NULL); @@ -797,8 +811,7 @@ count_nodes(sf_topology_t *info, int my_rank) } if (app_layout->node_ranks == NULL) { - app_layout->node_ranks = (int *) - calloc((size_t)(app_layout->world_size + 1), sizeof(int)); + app_layout->node_ranks = (int *)calloc((size_t)(app_layout->world_size + 1), sizeof(int)); } HDassert(app_layout->node_ranks != NULL); @@ -809,8 +822,8 @@ count_nodes(sf_topology_t *info, int my_rank) hostid_index = 0; } - app_layout->node_ranks[0] = 0; /* Add index */ - node_count = 1; + app_layout->node_ranks[0] = 0; /* Add index */ + node_count = 1; /* Recall that the topology array has been sorted! */ for (k = 1; k < app_layout->world_size; k++) { @@ -827,11 +840,10 @@ count_nodes(sf_topology_t *info, int my_rank) /* Mark the end of the node_ranks */ app_layout->node_ranks[node_count] = app_layout->world_size; /* Save the index where we first located my hostid */ - app_layout->node_index = hostid_index; + app_layout->node_index = hostid_index; return app_layout->node_count = node_count; } /* end count_nodes() */ - /*------------------------------------------------------------------------- * Function: identify_ioc_ranks * @@ -848,34 +860,34 @@ count_nodes(sf_topology_t *info, int my_rank) * As a side effect, we fill the 'ioc_concentrator' vector * and set the 'rank_is_ioc' flag to TRUE if our rank is * identified as owning an IO Concentrator (IOC). - * + * *------------------------------------------------------------------------- */ -static int +static int identify_ioc_ranks(int node_count, int iocs_per_node, sf_topology_t *info) { - int n; - int total_ioc_count = 0; - app_layout_t *app_layout = NULL; + int n; + int total_ioc_count = 0; + app_layout_t *app_layout = NULL; HDassert(info != NULL); - HDassert((app_layout = info->app_layout) != NULL); + HDassert((app_layout = info->app_layout) != NULL); - for (n=0; n < node_count; n++) { + for (n = 0; n < node_count; n++) { int k; - int node_index = app_layout->node_ranks[n]; - int local_peer_count = app_layout->node_ranks[n+1] - app_layout->node_ranks[n]; + int node_index = app_layout->node_ranks[n]; + int local_peer_count = app_layout->node_ranks[n + 1] - app_layout->node_ranks[n]; info->io_concentrator[total_ioc_count++] = (int)(app_layout->layout[node_index++].rank); - if (app_layout->layout[node_index-1].rank == app_layout->world_rank) { - info->subfile_rank = total_ioc_count-1; - info->rank_is_ioc = TRUE; + if (app_layout->layout[node_index - 1].rank == app_layout->world_rank) { + info->subfile_rank = total_ioc_count - 1; + info->rank_is_ioc = TRUE; } - for(k=1; k < iocs_per_node; k++) { + for (k = 1; k < iocs_per_node; k++) { if (k < local_peer_count) { if (app_layout->layout[node_index].rank == app_layout->world_rank) { - info->rank_is_ioc = TRUE; + info->rank_is_ioc = TRUE; info->subfile_rank = total_ioc_count; } info->io_concentrator[total_ioc_count++] = (int)(app_layout->layout[node_index++].rank); @@ -888,18 +900,18 @@ identify_ioc_ranks(int node_count, int iocs_per_node, sf_topology_t *info) } /* end identify_ioc_ranks() */ static inline void -assign_ioc_ranks(int *io_concentrator, int ioc_count, int rank_multiple, sf_topology_t *app_topology ) +assign_ioc_ranks(int *io_concentrator, int ioc_count, int rank_multiple, sf_topology_t *app_topology) { app_layout_t *app_layout = NULL; /* Validate that the input pointers are not NULL */ HDassert(io_concentrator); HDassert(app_topology); HDassert((app_layout = app_topology->app_layout) != NULL); - /* fill the io_concentrator values based on the application layout */ + /* fill the io_concentrator values based on the application layout */ if (io_concentrator) { int k, ioc_next, ioc_index; - for ( k=0, ioc_next = 0; ioc_next < ioc_count; ioc_next++) { - ioc_index = rank_multiple * k++; + for (k = 0, ioc_next = 0; ioc_next < ioc_count; ioc_next++) { + ioc_index = rank_multiple * k++; io_concentrator[ioc_next] = (int)(app_layout->layout[ioc_index].rank); if (io_concentrator[ioc_next] == app_layout->world_rank) app_topology->rank_is_ioc = TRUE; @@ -908,8 +920,6 @@ assign_ioc_ranks(int *io_concentrator, int ioc_count, int rank_multiple, sf_topo } } /* end assign_ioc_ranks() */ - - /*------------------------------------------------------------------------- * Function: fid_map_to_context * @@ -932,7 +942,7 @@ fid_map_to_context(uint64_t sf_fid) if (sf_open_file_map) { int i; for (i = 0; i < sf_file_map_size; i++) { - hid_t sf_context_id = sf_open_file_map[i].sf_context_id; + hid_t sf_context_id = sf_open_file_map[i].sf_context_id; if (sf_open_file_map[i].h5_file_id == sf_fid) { return sf_context_id; } @@ -941,7 +951,6 @@ fid_map_to_context(uint64_t sf_fid) return H5I_INVALID_HID; } /* end fid_map_to_context() */ - /*------------------------------------------------------------------------- * Function: clear_fid_map_entry * @@ -965,7 +974,7 @@ clear_fid_map_entry(uint64_t sf_fid) int i; for (i = 0; i < sf_file_map_size; i++) { if (sf_open_file_map[i].h5_file_id == sf_fid) { - sf_open_file_map[i].h5_file_id = (uint64_t)H5I_INVALID_HID; + sf_open_file_map[i].h5_file_id = (uint64_t)H5I_INVALID_HID; sf_open_file_map[i].sf_context_id = 0; return; } @@ -1037,39 +1046,38 @@ active_map_entries(void) *------------------------------------------------------------------------- */ int -H5FD__determine_ioc_count(int world_size, int world_rank, - ioc_selection_t ioc_select_method, char *ioc_select_option, - sf_topology_t **thisapp) +H5FD__determine_ioc_count(int world_size, int world_rank, ioc_selection_t ioc_select_method, + char *ioc_select_option, sf_topology_t **thisapp) { - int ioc_count = 0; + int ioc_count = 0; ioc_selection_t ioc_selection = ioc_selection_options; - sf_topology_t * app_topology = NULL; + sf_topology_t * app_topology = NULL; HDassert(thisapp != NULL); if (!ioc_count || (ioc_selection != ioc_select_method)) { - int rank_multiple = 0; - int iocs_per_node = 1; - char * envValue = NULL; - int * io_concentrator = NULL; + int rank_multiple = 0; + int iocs_per_node = 1; + char *envValue = NULL; + int * io_concentrator = NULL; if ((app_topology = *thisapp) == NULL) { - app_topology = (sf_topology_t *) calloc(1, sizeof(sf_topology_t)); + app_topology = (sf_topology_t *)calloc(1, sizeof(sf_topology_t)); HDassert(app_topology != NULL); - } + } if (sf_app_layout == NULL) { - sf_app_layout = (app_layout_t *) calloc(1, sizeof(app_layout_t)); - HDassert (sf_app_layout != NULL); - } - /* Once the application layout has been filled once, any additional + sf_app_layout = (app_layout_t *)calloc(1, sizeof(app_layout_t)); + HDassert(sf_app_layout != NULL); + } + /* Once the application layout has been filled once, any additional * file open operations won't be required to gather that information. */ - app_topology->app_layout = sf_app_layout; + app_topology->app_layout = sf_app_layout; sf_app_layout->world_size = world_size; sf_app_layout->world_rank = world_rank; if (app_topology->io_concentrator == NULL) { app_topology->io_concentrator = io_concentrator = - (int *) HDmalloc(((size_t) world_size * sizeof(int))); + (int *)HDmalloc(((size_t)world_size * sizeof(int))); } assert(io_concentrator != NULL); app_topology->selection_type = ioc_selection = ioc_select_method; @@ -1082,13 +1090,13 @@ H5FD__determine_ioc_count(int world_size, int world_rank, if (ioc_select_method == SELECT_IOC_TOTAL) { if (ioc_select_option) { int checkValue = atoi(ioc_select_option); - if ((checkValue <= 0) || (checkValue >= world_size)) { + if ((checkValue <= 0) || (checkValue >= world_size)) { ioc_select_method = SELECT_IOC_ONE_PER_NODE; goto next; } - ioc_count = checkValue; - rank_multiple = (world_size/checkValue); + ioc_count = checkValue; + rank_multiple = (world_size / checkValue); assign_ioc_ranks(io_concentrator, ioc_count, rank_multiple, app_topology); *thisapp = app_topology; } @@ -1112,8 +1120,8 @@ H5FD__determine_ioc_count(int world_size, int world_rank, goto next; } rank_multiple = checkValue; - ioc_count = (world_size / rank_multiple); - + ioc_count = (world_size / rank_multiple); + if ((world_size % rank_multiple) != 0) { ioc_count++; } @@ -1131,35 +1139,35 @@ next: if (ioc_select_method == SELECT_IOC_ONE_PER_NODE) { app_topology->selection_type = ioc_select_method; - ioc_count = count_nodes(app_topology, world_rank); + ioc_count = count_nodes(app_topology, world_rank); - if ((envValue = HDgetenv("IOC_COUNT_PER_NODE")) != NULL) { + if ((envValue = HDgetenv("H5_IOC_COUNT_PER_NODE")) != NULL) { int value_check = atoi(envValue); if (value_check > 0) { iocs_per_node = value_check; } } - ioc_count = identify_ioc_ranks( ioc_count, iocs_per_node, app_topology); + ioc_count = identify_ioc_ranks(ioc_count, iocs_per_node, app_topology); } if (ioc_count > 0) { app_topology->n_io_concentrators = ioc_count; /* Create a vector of "potential" file descriptors - * which can be indexed by the IOC id. + * which can be indexed by the IOC id. */ - app_topology->subfile_fd = (int *) HDcalloc((size_t)ioc_count, sizeof(int)); + app_topology->subfile_fd = (int *)HDcalloc((size_t)ioc_count, sizeof(int)); if (app_topology->subfile_fd == NULL) { HDputs("Failed to allocate vector of subfile fds"); } *thisapp = app_topology; } - } else { + } + else { HDputs("Unable to create app_toplogy"); } return ioc_count; } /* end H5FD__determine_ioc_count() */ - /* ------------------------------------------------------------------------- Programmer: Richard Warren @@ -1193,8 +1201,9 @@ get_ioc_selection_criteria(ioc_selection_t *selection) if ((checkValue < 0) || (checkValue >= ioc_selection_options)) { *selection = SELECT_IOC_ONE_PER_NODE; return NULL; - } else { - *selection = (ioc_selection_t) checkValue; + } + else { + *selection = (ioc_selection_t)checkValue; return optValue; } } @@ -1219,9 +1228,10 @@ get_ioc_selection_criteria(ioc_selection_t *selection) *------------------------------------------------------------------------- */ int -H5FD__init_subfile_context(sf_topology_t *thisApp, int n_iocs, int world_rank, subfiling_context_t *newContext) +H5FD__init_subfile_context(sf_topology_t *thisApp, int n_iocs, int world_rank, + subfiling_context_t *newContext) { - MPI_Comm sf_msg_comm = MPI_COMM_NULL; + MPI_Comm sf_msg_comm = MPI_COMM_NULL; MPI_Comm sf_data_comm = MPI_COMM_NULL; assert(newContext != NULL); @@ -1229,37 +1239,35 @@ H5FD__init_subfile_context(sf_topology_t *thisApp, int n_iocs, int world_rank, s int status; char *envValue = NULL; - newContext->topology = thisApp; - newContext->sf_msg_comm = MPI_COMM_NULL; - newContext->sf_data_comm = MPI_COMM_NULL; - newContext->sf_group_comm = MPI_COMM_NULL; - newContext->sf_intercomm = MPI_COMM_NULL; + newContext->topology = thisApp; + newContext->sf_msg_comm = MPI_COMM_NULL; + newContext->sf_data_comm = MPI_COMM_NULL; + newContext->sf_group_comm = MPI_COMM_NULL; + newContext->sf_intercomm = MPI_COMM_NULL; newContext->sf_stripe_size = H5FD_DEFAULT_STRIPE_DEPTH; newContext->sf_write_count = 0; - newContext->sf_read_count = 0; - newContext->sf_eof = 0; + newContext->sf_read_count = 0; + newContext->sf_eof = 0; - if ((envValue = HDgetenv("IOC_STRIPE_SIZE")) != NULL) { + if ((envValue = HDgetenv("H5_IOC_STRIPE_SIZE")) != NULL) { long value_check = atol(envValue); if (value_check > 0) { - newContext->sf_stripe_size = (int64_t) value_check; + newContext->sf_stripe_size = (int64_t)value_check; } } - if ((envValue = HDgetenv("IOC_SUBFILE_PREFIX")) != NULL) { + if ((envValue = HDgetenv("H5_IOC_SUBFILE_PREFIX")) != NULL) { char temp[PATH_MAX]; sprintf(temp, "%s", envValue); newContext->subfile_prefix = strdup(temp); /* sf_subfile_prefix = strdup(temp); */ } - newContext->sf_blocksize_per_stripe = - newContext->sf_stripe_size * n_iocs; + newContext->sf_blocksize_per_stripe = newContext->sf_stripe_size * n_iocs; if (sf_msg_comm == MPI_COMM_NULL) { status = MPI_Comm_dup(MPI_COMM_WORLD, &newContext->sf_msg_comm); if (status != MPI_SUCCESS) goto err_exit; - status = MPI_Comm_set_errhandler( - newContext->sf_msg_comm, MPI_ERRORS_RETURN); + status = MPI_Comm_set_errhandler(newContext->sf_msg_comm, MPI_ERRORS_RETURN); if (status != MPI_SUCCESS) goto err_exit; sf_msg_comm = newContext->sf_msg_comm; @@ -1268,31 +1276,29 @@ H5FD__init_subfile_context(sf_topology_t *thisApp, int n_iocs, int world_rank, s status = MPI_Comm_dup(MPI_COMM_WORLD, &newContext->sf_data_comm); if (status != MPI_SUCCESS) goto err_exit; - status = MPI_Comm_set_errhandler( - newContext->sf_data_comm, MPI_ERRORS_RETURN); + status = MPI_Comm_set_errhandler(newContext->sf_data_comm, MPI_ERRORS_RETURN); if (status != MPI_SUCCESS) goto err_exit; sf_data_comm = newContext->sf_data_comm; } if (n_iocs > 1) { - status = MPI_Comm_split(MPI_COMM_WORLD, thisApp->rank_is_ioc, - world_rank, &newContext->sf_group_comm); + status = + MPI_Comm_split(MPI_COMM_WORLD, thisApp->rank_is_ioc, world_rank, &newContext->sf_group_comm); if (status != MPI_SUCCESS) goto err_exit; - status = MPI_Comm_size( - newContext->sf_group_comm, &newContext->sf_group_size); + status = MPI_Comm_size(newContext->sf_group_comm, &newContext->sf_group_size); if (status != MPI_SUCCESS) goto err_exit; - status = MPI_Comm_rank( - newContext->sf_group_comm, &newContext->sf_group_rank); + status = MPI_Comm_rank(newContext->sf_group_comm, &newContext->sf_group_rank); if (status != MPI_SUCCESS) goto err_exit; /* * There may be additional functionality we need for the IOCs... * If so, then can probably initialize those things here! */ - } else { + } + else { newContext->sf_group_size = 1; newContext->sf_group_rank = 0; } @@ -1303,7 +1309,6 @@ err_exit: return -1; } /* end H5FD__init_subfile_context() */ - /* ------------------------------------------------------------------------- Programmer: Richard Warren @@ -1329,12 +1334,12 @@ H5FDsubfiling_init(ioc_selection_t ioc_select_method, char *ioc_select_option, i herr_t ret_value = SUCCEED; int ioc_count; int world_rank, world_size; - sf_topology_t * thisApp = NULL; + sf_topology_t * thisApp = NULL; int file_index = active_map_entries(); - int64_t tag = SF_CONTEXT; + int64_t tag = SF_CONTEXT; int64_t context_id = ((tag << 32) | file_index); - subfiling_context_t *newContext = (subfiling_context_t *) get__subfiling_object(context_id); - char *envValue = NULL; + subfiling_context_t *newContext = (subfiling_context_t *)get__subfiling_object(context_id); + char * envValue = NULL; FUNC_ENTER_API(FAIL) H5TRACE3("e", "IO*s*!", ioc_select_method, ioc_select_option, sf_context); @@ -1351,8 +1356,8 @@ H5FDsubfiling_init(ioc_selection_t ioc_select_method, char *ioc_select_option, i } /* Compute the number an distribution map of the set of IO Concentrators */ - if ((ioc_count = H5FD__determine_ioc_count(world_size, world_rank, - ioc_select_method, ioc_select_option, &thisApp)) <= 0) { + if ((ioc_count = H5FD__determine_ioc_count(world_size, world_rank, ioc_select_method, ioc_select_option, + &thisApp)) <= 0) { HDputs("Unable to register subfiling topology!"); ret_value = FAIL; goto done; @@ -1361,15 +1366,16 @@ H5FDsubfiling_init(ioc_selection_t ioc_select_method, char *ioc_select_option, i newContext->sf_context_id = context_id; /* Maybe set the verbose flag for more debugging info */ - envValue = HDgetenv("SF_VERBOSE_FLAG"); + envValue = HDgetenv("H5_SF_VERBOSE_FLAG"); if (envValue != NULL) { int check_value = atoi(envValue); - if (check_value > 0) sf_verbose_flag = 1; + if (check_value > 0) + sf_verbose_flag = 1; } /* Maybe open client-side log files */ - if (sf_verbose_flag ) { - manage_client_logfile(world_rank,sf_verbose_flag); + if (sf_verbose_flag) { + manage_client_logfile(world_rank, sf_verbose_flag); } if (H5FD__init_subfile_context(thisApp, ioc_count, world_rank, newContext) != SUCCEED) { @@ -1382,6 +1388,11 @@ H5FDsubfiling_init(ioc_selection_t ioc_select_method, char *ioc_select_option, i ret_value = FAIL; goto done; } + + newContext->sf_base_addr = 0; + if (newContext->topology->rank_is_ioc) { + newContext->sf_base_addr = (int64_t)(newContext->topology->subfile_rank * newContext->sf_stripe_size); + } *sf_context = context_id; done: @@ -1427,7 +1438,7 @@ H5FD__open_subfiles(void *_config_info, uint64_t h5_file_id, int flags) // char filepath[PATH_MAX]; // char *slash; config_common_t *config_info = _config_info; - char *option_arg = get_ioc_selection_criteria(&ioc_selection); + char * option_arg = get_ioc_selection_criteria(&ioc_selection); HDassert(config_info); /* Check to see who is calling ths function:: @@ -1437,7 +1448,7 @@ H5FD__open_subfiles(void *_config_info, uint64_t h5_file_id, int flags) (config_info->magic != H5FD_SUBFILING_FAPL_T_MAGIC)) { HDputs("Unrecgonized driver!"); return -1; - } + } /* Initialize/identify IO Concentrators based on the * config information that we have... @@ -1451,19 +1462,19 @@ H5FD__open_subfiles(void *_config_info, uint64_t h5_file_id, int flags) /* For statistics gathering */ maybe_initialize_statistics(); - /* Create a new context which is associated with + /* Create a new context which is associated with * this file (context_id) */ sf_context = get__subfiling_object(context_id); assert(sf_context != NULL); /* Save some basic things in the new context */ - config_info->context_id = context_id; - sf_context->sf_fid = 0; + config_info->context_id = context_id; + sf_context->sf_fid = 0; sf_context->sf_context_id = context_id; - sf_context->h5_file_id = h5_file_id; - sf_context->filename = strdup(config_info->file_path); - + sf_context->h5_file_id = h5_file_id; + sf_context->h5_filename = strdup(config_info->file_path); + sf_context->sf_filename = NULL; /* Ensure that the IOC service won't exit * as we prepare to start up.. */ @@ -1499,4 +1510,3 @@ H5FD__close_subfiles(int64_t context_id) assert(sf_context != NULL); return close__subfiles(sf_context, sf_context->h5_file_id); } - diff --git a/src/H5FDsubfile_mpi.c b/src/H5FDsubfile_mpi.c index 64a3959..6e060c5 100644 --- a/src/H5FDsubfile_mpi.c +++ b/src/H5FDsubfile_mpi.c @@ -13,17 +13,17 @@ #include "H5FDsubfiling.h" -static int sf_close_file_count = 0; +static int sf_close_file_count = 0; static int sf_ops_after_first_close = 0; -static int sf_enable_directIO = 0; +static int sf_enable_directIO = 0; -static int sf_write_ops = 0; -static double sf_pwrite_time = 0.0; +static int sf_write_ops = 0; +static double sf_pwrite_time = 0.0; static double sf_write_wait_time = 0.0; -static int sf_read_ops = 0; -static double sf_pread_time = 0.0; -static double sf_read_wait_time = 0.0; +static int sf_read_ops = 0; +static double sf_pread_time = 0.0; +static double sf_read_wait_time = 0.0; static double sf_queue_delay_time = 0.0; /* The following is our basic template for a subfile filename. @@ -32,18 +32,18 @@ static double sf_queue_delay_time = 0.0; * zeroth subfile as well as for all metadata. */ #define SF_NODE_LOCAL_TEMPLATE "%ld_node_local_%d_of_%d" -#define SF_FILENAME_TEMPLATE "%ld_subfile_%d_of_%d" +#define SF_FILENAME_TEMPLATE "%ld_subfile_%d_of_%d" static int *request_count_per_rank = NULL; -atomic_int sf_workinprogress = 0; -atomic_int sf_work_pending = 0; -atomic_int sf_file_open_count = 0; -atomic_int sf_file_close_count = 0; -atomic_int sf_file_refcount = 0; -atomic_int sf_ioc_fini_refcount = 0; -atomic_int sf_ioc_ready = 0; -volatile int sf_shutdown_flag = 0; +atomic_int sf_workinprogress = 0; +atomic_int sf_work_pending = 0; +atomic_int sf_file_open_count = 0; +atomic_int sf_file_close_count = 0; +atomic_int sf_file_refcount = 0; +atomic_int sf_ioc_fini_refcount = 0; +atomic_int sf_ioc_ready = 0; +volatile int sf_shutdown_flag = 0; /* * Structure definitions to enable async io completions @@ -52,69 +52,68 @@ volatile int sf_shutdown_flag = 0; * invoked. See below. */ typedef struct _client_io_args { - int ioc; /* ID of the IO Concentrator handling this IO. */ - hid_t context_id; /* The context id provided for the read or write */ - int64_t offset; /* The file offset for the IO operation */ - int64_t elements; /* How many bytes */ - void *data; /* A pointer to the (contiguous) data segment */ - MPI_Request io_req; /* An MPI request to allow the code to loop while */ - /* making progress on multiple IOs */ + int ioc; /* ID of the IO Concentrator handling this IO. */ + hid_t context_id; /* The context id provided for the read or write */ + int64_t offset; /* The file offset for the IO operation */ + int64_t elements; /* How many bytes */ + void * data; /* A pointer to the (contiguous) data segment */ + MPI_Request io_req; /* An MPI request to allow the code to loop while */ + /* making progress on multiple IOs */ } io_args_t; /* pre-define */ typedef struct _client_io_func io_func_t; struct _client_io_func { - int (*io_function)(void *this_io); /* pointer to a completion function */ - io_args_t io_args; /* arguments passed to the completion function */ - int pending; /* The function is complete (0) or pending (1)? */ + int (*io_function)(void *this_io); /* pointer to a completion function */ + io_args_t io_args; /* arguments passed to the completion function */ + int pending; /* The function is complete (0) or pending (1)? */ }; typedef struct _io_req { - struct _io_req *prev; /* A simple list structure containing completion */ - struct _io_req *next; /* functions. These should get removed as IO ops */ - io_func_t completion_func; /* are completed */ + struct _io_req *prev; /* A simple list structure containing completion */ + struct _io_req *next; /* functions. These should get removed as IO ops */ + io_func_t completion_func; /* are completed */ } io_req_t; -int n_io_pending = 0; +int n_io_pending = 0; io_req_t pending_io_requests; typedef struct _client_xfer_info { - int64_t offset; - int64_t length; - int ioc_targets; - io_op_t op; + int64_t offset; + int64_t length; + int ioc_targets; + io_op_t op; } client_xfer_info_t; typedef struct _xfer_info { - int64_t offset; - int64_t length; + int64_t offset; + int64_t length; } xfer_info_t; #define STAT_BLOCKSIZE 1024 typedef struct _ioc_stats { - int read_index; - int read_size; - xfer_info_t *read_info; - int write_index; - int write_size; - xfer_info_t *write_info; + int read_index; + int read_size; + xfer_info_t *read_info; + int write_index; + int write_size; + xfer_info_t *write_info; } ioc_stats_t; static ioc_stats_t ioc_xfer_records; -int client_op_index = 0; -int client_op_size = 0; -client_xfer_info_t *client_ops = NULL; +int client_op_index = 0; +int client_op_size = 0; +client_xfer_info_t *client_ops = NULL; /* const char *sf_subfile_prefix = "."; */ -#define MAX_WORK_PER_RANK 2 -#define K(n) ((n)*1024) -#define M(n) ((n) * (1024 * 1024)) +#define MAX_WORK_PER_RANK 2 +#define K(n) ((n)*1024) +#define M(n) ((n) * (1024 * 1024)) #define DEFAULT_STRIPE_SIZE M(32) -#define MAX_DEPTH 1024 - +#define MAX_DEPTH 1024 /* ========================================= @@ -122,17 +121,18 @@ Private functions ========================================= */ -static inline void *cast_to_void(const void *data) { - union { - const void *const_ptr_to_data; - void *ptr_to_data; - } eliminate_const_warning; - eliminate_const_warning.const_ptr_to_data = data; - return eliminate_const_warning.ptr_to_data; +static inline void * +cast_to_void(const void *data) +{ + union { + const void *const_ptr_to_data; + void * ptr_to_data; + } eliminate_const_warning; + eliminate_const_warning.const_ptr_to_data = data; + return eliminate_const_warning.ptr_to_data; } -static char *get_ioc_subfile_path(int ioc, int ioc_count, - subfiling_context_t *sf_context); -static int async_completion(void *arg); +static char *get_ioc_subfile_path(int ioc, int ioc_count, subfiling_context_t *sf_context); +static int async_completion(void *arg); /* ===================================================================== */ /* MPI_Datatype Creation functions. @@ -176,49 +176,49 @@ static int async_completion(void *arg); /* Fill the output vectors 'io_offset', 'io_datasize' and 'io_f_offset' * All calculations are in terms of bytes. */ -static void H5FD__create_first_mpi_type( - subfiling_context_t *context, int ioc_depth, int64_t src_offset, - int64_t target_datasize, int64_t f_offset, int64_t *io_offset, - int64_t *io_datasize, int64_t *io_f_offset, int64_t first_io) { - int64_t stripe_size = context->sf_stripe_size; - int64_t blocksize_per_stripe = context->sf_blocksize_per_stripe; - int64_t offset_in_stripe = f_offset % stripe_size; - int64_t next_offset = blocksize_per_stripe - offset_in_stripe; - int64_t total_bytes = first_io; - - io_offset[0] = src_offset; - io_datasize[0] = first_io; - io_f_offset[0] = f_offset; +static void +H5FD__create_first_mpi_type(subfiling_context_t *context, int ioc_depth, int64_t src_offset, + int64_t target_datasize, int64_t f_offset, int64_t *io_offset, + int64_t *io_datasize, int64_t *io_f_offset, int64_t first_io) +{ + int64_t stripe_size = context->sf_stripe_size; + int64_t blocksize_per_stripe = context->sf_blocksize_per_stripe; + int64_t offset_in_stripe = f_offset % stripe_size; + int64_t next_offset = blocksize_per_stripe - offset_in_stripe; + int64_t total_bytes = first_io; + + io_offset[0] = src_offset; + io_datasize[0] = first_io; + io_f_offset[0] = f_offset; #ifdef VERBOSE - printf("[%s] 0: mem_offset=%ld, datasize=%ld, f_offset=%ld\n", - __func__, src_offset, first_io, f_offset); - fflush(stdout); + printf("[%s] 0: mem_offset=%ld, datasize=%ld, f_offset=%ld\n", __func__, src_offset, first_io, f_offset); + fflush(stdout); #endif - if (first_io == target_datasize) { - return; - } - if (first_io) { - int k; - f_offset += (blocksize_per_stripe - offset_in_stripe); - for (k = 1; k <= ioc_depth; k++) { - io_offset[k] = next_offset; - io_datasize[k] = stripe_size; - io_f_offset[k] = f_offset; - total_bytes += stripe_size; + if (first_io == target_datasize) { + return; + } + if (first_io) { + int k; + f_offset += (blocksize_per_stripe - offset_in_stripe); + for (k = 1; k <= ioc_depth; k++) { + io_offset[k] = next_offset; + io_datasize[k] = stripe_size; + io_f_offset[k] = f_offset; + total_bytes += stripe_size; #ifdef VERBOSE - printf("[%s] %d: mem_offset=%ld, datasize=%ld, f_offset=%ld\n", - __func__, k, next_offset, stripe_size, f_offset); - fflush(stdout); + printf("[%s] %d: mem_offset=%ld, datasize=%ld, f_offset=%ld\n", __func__, k, next_offset, + stripe_size, f_offset); + fflush(stdout); #endif - f_offset += context->sf_blocksize_per_stripe; - next_offset += context->sf_blocksize_per_stripe; - } - if (total_bytes != target_datasize) { - printf("Warning (%s): total_SUM(%ld) != target_bytes(%ld)\n", __func__, - total_bytes, target_datasize); + f_offset += context->sf_blocksize_per_stripe; + next_offset += context->sf_blocksize_per_stripe; + } + if (total_bytes != target_datasize) { + printf("Warning (%s): total_SUM(%ld) != target_bytes(%ld)\n", __func__, total_bytes, + target_datasize); + } } - } - return; + return; } /* end H5FD__create_first_mpi_type() */ /*------------------------------------------------------------------------- @@ -250,57 +250,56 @@ static void H5FD__create_first_mpi_type( /* Fill the output vectors 'io_offset', 'io_datasize' and 'io_f_offset' * All calculations are in terms of bytes. */ -static void H5FD__create_final_mpi_type(subfiling_context_t *context, - int ioc_depth, int64_t src_offset, - int64_t target_datasize, - int64_t f_offset, int64_t *io_offset, - int64_t *io_datasize, - int64_t *io_f_offset, int64_t last_io) { - int64_t stripe_size = context->sf_stripe_size; - int64_t blocksize_per_stripe = context->sf_blocksize_per_stripe; - int64_t next_offset = src_offset; - int64_t total_bytes = 0; - - if (last_io == target_datasize) { - io_offset[0] = src_offset; - io_f_offset[0] = f_offset; - io_datasize[0] = last_io; +static void +H5FD__create_final_mpi_type(subfiling_context_t *context, int ioc_depth, int64_t src_offset, + int64_t target_datasize, int64_t f_offset, int64_t *io_offset, + int64_t *io_datasize, int64_t *io_f_offset, int64_t last_io) +{ + int64_t stripe_size = context->sf_stripe_size; + int64_t blocksize_per_stripe = context->sf_blocksize_per_stripe; + int64_t next_offset = src_offset; + int64_t total_bytes = 0; + + if (last_io == target_datasize) { + io_offset[0] = src_offset; + io_f_offset[0] = f_offset; + io_datasize[0] = last_io; #ifdef VERBOSE - printf("[%s] 0: mem_offset=%ld, datasize=%ld, f_offset=%ld\n", - __func__, src_offset, last_io, f_offset); - fflush(stdout); + printf("[%s] 0: mem_offset=%ld, datasize=%ld, f_offset=%ld\n", __func__, src_offset, last_io, + f_offset); + fflush(stdout); #endif - return; - } - - if (last_io) { - int i, k; - for (k = 0, i = 1; i < ioc_depth; i++) { - io_offset[k] = next_offset; - io_datasize[k] = stripe_size; - io_f_offset[k] = f_offset; + return; + } + + if (last_io) { + int i, k; + for (k = 0, i = 1; i < ioc_depth; i++) { + io_offset[k] = next_offset; + io_datasize[k] = stripe_size; + io_f_offset[k] = f_offset; #ifdef VERBOSE - printf("[%s] %d: mem_offset=%ld, datasize=%ld, f_offset=%ld\n", - __func__, k, next_offset, stripe_size, f_offset); - fflush(stdout); + printf("[%s] %d: mem_offset=%ld, datasize=%ld, f_offset=%ld\n", __func__, k, next_offset, + stripe_size, f_offset); + fflush(stdout); #endif - k++; - total_bytes += stripe_size; - f_offset += blocksize_per_stripe; - next_offset += context->sf_blocksize_per_stripe; - } + k++; + total_bytes += stripe_size; + f_offset += blocksize_per_stripe; + next_offset += context->sf_blocksize_per_stripe; + } - io_datasize[k] = last_io; - io_offset[k] = next_offset; - io_f_offset[k] = f_offset; - total_bytes += last_io; + io_datasize[k] = last_io; + io_offset[k] = next_offset; + io_f_offset[k] = f_offset; + total_bytes += last_io; - if (total_bytes != target_datasize) { - printf("Warning (%s): total_SUM(%ld) != target_bytes(%ld)\n", __func__, - total_bytes, target_datasize); + if (total_bytes != target_datasize) { + printf("Warning (%s): total_SUM(%ld) != target_bytes(%ld)\n", __func__, total_bytes, + target_datasize); + } } - } - return; + return; } /* end H5FD__create_final_mpi_type() */ /*------------------------------------------------------------------------- @@ -325,63 +324,61 @@ static void H5FD__create_final_mpi_type(subfiling_context_t *context, *------------------------------------------------------------------------- */ -static void H5FD__create_f_l_mpi_type(subfiling_context_t *context, - int ioc_depth, int64_t src_offset, - int64_t target_datasize, int64_t f_offset, - int64_t *io_offset, int64_t *io_datasize, - int64_t *io_f_offset, int64_t first_io, - int64_t last_io) { - int64_t stripe_size = context->sf_stripe_size; - int64_t blocksize_per_stripe = context->sf_blocksize_per_stripe; - int64_t offset_in_stripe = f_offset % stripe_size; - int64_t next_offset = blocksize_per_stripe - offset_in_stripe; - int64_t total_bytes = first_io; - - io_offset[0] = src_offset; - io_datasize[0] = first_io; - io_f_offset[0] = f_offset; +static void +H5FD__create_f_l_mpi_type(subfiling_context_t *context, int ioc_depth, int64_t src_offset, + int64_t target_datasize, int64_t f_offset, int64_t *io_offset, int64_t *io_datasize, + int64_t *io_f_offset, int64_t first_io, int64_t last_io) +{ + int64_t stripe_size = context->sf_stripe_size; + int64_t blocksize_per_stripe = context->sf_blocksize_per_stripe; + int64_t offset_in_stripe = f_offset % stripe_size; + int64_t next_offset = blocksize_per_stripe - offset_in_stripe; + int64_t total_bytes = first_io; + + io_offset[0] = src_offset; + io_datasize[0] = first_io; + io_f_offset[0] = f_offset; #ifdef VERBOSE - printf("[%s] 0: mem_offset=%ld, datasize=%ld, f_offset=%ld\n", - __func__, src_offset, first_io, f_offset); - fflush(stdout); + printf("[%s] 0: mem_offset=%ld, datasize=%ld, f_offset=%ld\n", __func__, src_offset, first_io, f_offset); + fflush(stdout); #endif - if (total_bytes == target_datasize) { - return; - } - - if (total_bytes) { - int k; - f_offset += (blocksize_per_stripe - offset_in_stripe); - for (k = 1; k < ioc_depth; k++) { - io_offset[k] = next_offset; - io_datasize[k] = stripe_size; - io_f_offset[k] = f_offset; - total_bytes += stripe_size; + if (total_bytes == target_datasize) { + return; + } + + if (total_bytes) { + int k; + f_offset += (blocksize_per_stripe - offset_in_stripe); + for (k = 1; k < ioc_depth; k++) { + io_offset[k] = next_offset; + io_datasize[k] = stripe_size; + io_f_offset[k] = f_offset; + total_bytes += stripe_size; #ifdef VERBOSE - printf("[%s] %d: mem_offset=%ld, datasize=%ld, f_offset=%ld\n", - __func__, k, next_offset, stripe_size, f_offset); - fflush(stdout); + printf("[%s] %d: mem_offset=%ld, datasize=%ld, f_offset=%ld\n", __func__, k, next_offset, + stripe_size, f_offset); + fflush(stdout); #endif - f_offset += blocksize_per_stripe; - next_offset += blocksize_per_stripe; - } - io_datasize[ioc_depth] = last_io; - io_f_offset[ioc_depth] = f_offset; - io_offset[ioc_depth] = next_offset; + f_offset += blocksize_per_stripe; + next_offset += blocksize_per_stripe; + } + io_datasize[ioc_depth] = last_io; + io_f_offset[ioc_depth] = f_offset; + io_offset[ioc_depth] = next_offset; #ifdef VERBOSE - printf("[%s] %d: mem_offset=%ld, datasize=%ld, f_offset=%ld\n", - __func__, k, next_offset, last_io, f_offset); - fflush(stdout); + printf("[%s] %d: mem_offset=%ld, datasize=%ld, f_offset=%ld\n", __func__, k, next_offset, last_io, + f_offset); + fflush(stdout); #endif - total_bytes += last_io; + total_bytes += last_io; - if (total_bytes != target_datasize) { - printf("Warning (%s): total_SUM(%ld) != target_bytes(%ld)\n", - __func__, total_bytes, target_datasize); + if (total_bytes != target_datasize) { + printf("Warning (%s): total_SUM(%ld) != target_bytes(%ld)\n", __func__, total_bytes, + target_datasize); + } } - } - return; + return; } /* end H5FD__create_f_l_mpi_type() */ /*------------------------------------------------------------------------- @@ -406,28 +403,27 @@ static void H5FD__create_f_l_mpi_type(subfiling_context_t *context, * *------------------------------------------------------------------------- */ -static void H5FD__create_mpi_uniform_type(subfiling_context_t *context, - int ioc_depth, int64_t src_offset, - int64_t target_datasize, - int64_t f_offset, int64_t *io_offset, - int64_t *io_datasize, - int64_t *io_f_offset) { - int64_t stripe_size = context->sf_stripe_size; - int64_t blocksize_per_stripe = context->sf_blocksize_per_stripe; - int64_t next_offset = src_offset + blocksize_per_stripe; - int64_t total_bytes = 0; - - io_offset[0] = src_offset; - io_datasize[0] = stripe_size; - io_f_offset[0] = f_offset; - if (target_datasize == 0) { +static void +H5FD__create_mpi_uniform_type(subfiling_context_t *context, int ioc_depth, int64_t src_offset, + int64_t target_datasize, int64_t f_offset, int64_t *io_offset, + int64_t *io_datasize, int64_t *io_f_offset) +{ + int64_t stripe_size = context->sf_stripe_size; + int64_t blocksize_per_stripe = context->sf_blocksize_per_stripe; + int64_t next_offset = src_offset + blocksize_per_stripe; + int64_t total_bytes = 0; + + io_offset[0] = src_offset; + io_datasize[0] = stripe_size; + io_f_offset[0] = f_offset; + if (target_datasize == 0) { #if 0 printf("[%s] 0: datasize=0\n", __func__); fflush(stdout); #endif - io_datasize[0] = 0; - return; - } + io_datasize[0] = 0; + return; + } #if 0 printf("[%s] 0: mem_offset=%ld, datasize=%ld, f_offset=%ld\n", @@ -435,31 +431,31 @@ static void H5FD__create_mpi_uniform_type(subfiling_context_t *context, fflush(stdout); #endif - f_offset += blocksize_per_stripe; - total_bytes = stripe_size; + f_offset += blocksize_per_stripe; + total_bytes = stripe_size; - if (target_datasize > stripe_size) { - int k; - for (k = 1; k < ioc_depth; k++) { - io_offset[k] = next_offset; - io_datasize[k] = stripe_size; - io_f_offset[k] = f_offset; + if (target_datasize > stripe_size) { + int k; + for (k = 1; k < ioc_depth; k++) { + io_offset[k] = next_offset; + io_datasize[k] = stripe_size; + io_f_offset[k] = f_offset; #if 0 printf("[%s] %d: mem_offset=%ld, datasize=%ld, f_offset=%ld\n", __func__, k, next_offset, stripe_size, f_offset); fflush(stdout); #endif - total_bytes += stripe_size; - f_offset += blocksize_per_stripe; - next_offset += blocksize_per_stripe; - } + total_bytes += stripe_size; + f_offset += blocksize_per_stripe; + next_offset += blocksize_per_stripe; + } - if (total_bytes != target_datasize) { - printf("Warning (%s): total_SUM(%ld) != target_bytes(%ld)\n", __func__, - total_bytes, target_datasize); + if (total_bytes != target_datasize) { + printf("Warning (%s): total_SUM(%ld) != target_bytes(%ld)\n", __func__, total_bytes, + target_datasize); + } } - } - return; + return; } /* end H5FD__create_mpi_uniform_type() */ /*------------------------------------------------------------------------- @@ -504,153 +500,149 @@ static void H5FD__create_mpi_uniform_type(subfiling_context_t *context, *------------------------------------------------------------------------- */ -int init__indep_io(void *_sf_context, size_t maxdepth, int H5_ATTR_PARALLEL_UNUSED ioc_total, - int64_t *sf_source_data_offset, int64_t *sf_datasize, - int64_t *sf_offset, int *first_index, int *n_containers, - int64_t offset, int64_t elements, - int dtype_extent) +int +init__indep_io(void *_sf_context, size_t maxdepth, int H5_ATTR_PARALLEL_UNUSED ioc_total, + int64_t *sf_source_data_offset, int64_t *sf_datasize, int64_t *sf_offset, int *first_index, + int *n_containers, int64_t offset, int64_t elements, int dtype_extent) { - subfiling_context_t *sf_context = _sf_context; - int container_count = sf_context->topology->n_io_concentrators; - int64_t stripe_size = sf_context->sf_stripe_size; - int64_t data_size = elements * dtype_extent; - - int64_t start_id = offset / stripe_size; - int64_t offset_in_stripe = offset % sf_context->sf_blocksize_per_stripe; - int64_t container_offset = offset % stripe_size; - int64_t start_length = MIN(data_size, (stripe_size - container_offset)); - int64_t start_row = start_id / container_count; - int64_t ioc_start = start_id % container_count; - int64_t final_offset = offset + data_size; - int64_t final_id = final_offset / stripe_size; - int64_t final_length = - (start_length == data_size ? 0 : final_offset % stripe_size); - int64_t ioc_final = final_id % container_count; - int64_t container_bytes = 0, total_bytes = 0; - int64_t source_offset = 0; - - int row_id_start = (int)(start_id - ioc_start); - int row_id_final = (int)(final_id - ioc_final); - int i, k, depth = ((row_id_final - row_id_start) / container_count) + 1; - int container_id = (int)start_id; - int64_t row_offset = (int64_t)(start_row * stripe_size); - - *first_index = (int)ioc_start; - - /* Given the IO parameters, we loop thru the set of IOCs - * to determine the various vector components for each. - * Those IOCs whose datasize is zero (0), will not have - * IO requests passed to them. - */ - - for (i = 0, k = (int)ioc_start; i < container_count; i++) { - /* We use 'output_offset' as an index into a linear - * version of a 2D array. In 'C' the last subscript - * is the one that varies most rapidly. - * In our case, the 2D array is represented as - * array[ container_count ][ maxdepth ] + subfiling_context_t *sf_context = _sf_context; + int container_count = sf_context->topology->n_io_concentrators; + int64_t stripe_size = sf_context->sf_stripe_size; + int64_t data_size = elements * dtype_extent; + + int64_t start_id = offset / stripe_size; + int64_t offset_in_stripe = offset % sf_context->sf_blocksize_per_stripe; + int64_t container_offset = offset % stripe_size; + int64_t start_length = MIN(data_size, (stripe_size - container_offset)); + int64_t start_row = start_id / container_count; + int64_t ioc_start = start_id % container_count; + int64_t final_offset = offset + data_size; + int64_t final_id = final_offset / stripe_size; + int64_t final_length = (start_length == data_size ? 0 : final_offset % stripe_size); + int64_t ioc_final = final_id % container_count; + int64_t container_bytes = 0, total_bytes = 0; + int64_t source_offset = 0; + + int row_id_start = (int)(start_id - ioc_start); + int row_id_final = (int)(final_id - ioc_final); + int i, k, depth = ((row_id_final - row_id_start) / container_count) + 1; + int container_id = (int)start_id; + int64_t row_offset = (int64_t)(start_row * stripe_size); + + *first_index = (int)ioc_start; + + /* Given the IO parameters, we loop thru the set of IOCs + * to determine the various vector components for each. + * Those IOCs whose datasize is zero (0), will not have + * IO requests passed to them. */ - size_t depthsize = maxdepth * sizeof(int64_t); /* ONLY used for memset */ - size_t output_offset = (size_t)(k) * maxdepth; - int container_depth = depth; - hbool_t is_first = false, is_last = false; - int64_t *__sf_source_data_offset = sf_source_data_offset + output_offset; - int64_t *__sf_datasize = sf_datasize + output_offset; - int64_t *__sf_offset = sf_offset + output_offset; + for (i = 0, k = (int)ioc_start; i < container_count; i++) { + /* We use 'output_offset' as an index into a linear + * version of a 2D array. In 'C' the last subscript + * is the one that varies most rapidly. + * In our case, the 2D array is represented as + * array[ container_count ][ maxdepth ] + */ + size_t depthsize = maxdepth * sizeof(int64_t); /* ONLY used for memset */ + size_t output_offset = (size_t)(k)*maxdepth; + int container_depth = depth; - memset(__sf_source_data_offset, 0, depthsize); - memset(__sf_datasize, 0, depthsize); - memset(__sf_offset, 0, depthsize); + hbool_t is_first = false, is_last = false; + int64_t *__sf_source_data_offset = sf_source_data_offset + output_offset; + int64_t *__sf_datasize = sf_datasize + output_offset; + int64_t *__sf_offset = sf_offset + output_offset; - container_bytes = 0; + memset(__sf_source_data_offset, 0, depthsize); + memset(__sf_datasize, 0, depthsize); + memset(__sf_offset, 0, depthsize); - if (total_bytes == data_size) { - *n_containers = i; - return depth + 1; - } - if (total_bytes < data_size) { - if (k == ioc_start) { - is_first = true; - container_bytes = start_length; - container_depth--; /* Account for the start_length */ - if (ioc_final < ioc_start) { - container_depth--; - depth--; + container_bytes = 0; + + if (total_bytes == data_size) { + *n_containers = i; + return depth + 1; } - } - if (k == ioc_final) { - is_last = true; - container_bytes += final_length; - if (container_depth) - container_depth--; /* Account for the final_length */ - if (depth) - depth--; - } - container_bytes += container_depth * stripe_size; - total_bytes += container_bytes; - } + if (total_bytes < data_size) { + if (k == ioc_start) { + is_first = true; + container_bytes = start_length; + container_depth--; /* Account for the start_length */ + if (ioc_final < ioc_start) { + container_depth--; + depth--; + } + } + if (k == ioc_final) { + is_last = true; + container_bytes += final_length; + if (container_depth) + container_depth--; /* Account for the final_length */ + if (depth) + depth--; + } + container_bytes += container_depth * stripe_size; + total_bytes += container_bytes; + } + + __sf_source_data_offset[0] = source_offset; + __sf_datasize[0] = container_bytes; + __sf_offset[0] = row_offset + offset_in_stripe; - __sf_source_data_offset[0] = source_offset; - __sf_datasize[0] = container_bytes; - __sf_offset[0] = row_offset + offset_in_stripe; - - if (container_count == 1) { - - } else { - /* Fill the IO datatypes */ - if (is_first) { - if (is_last) { /* First + Last */ - H5FD__create_f_l_mpi_type( - sf_context, container_depth + 1, source_offset, container_bytes, - row_offset + offset_in_stripe, __sf_source_data_offset, - __sf_datasize, __sf_offset, start_length, final_length); - } else { /* First ONLY */ - H5FD__create_first_mpi_type( - sf_context, container_depth, source_offset, container_bytes, - row_offset + offset_in_stripe, __sf_source_data_offset, - __sf_datasize, __sf_offset, start_length); + if (container_count == 1) { + } + else { + /* Fill the IO datatypes */ + if (is_first) { + if (is_last) { /* First + Last */ + H5FD__create_f_l_mpi_type(sf_context, container_depth + 1, source_offset, container_bytes, + row_offset + offset_in_stripe, __sf_source_data_offset, + __sf_datasize, __sf_offset, start_length, final_length); + } + else { /* First ONLY */ + H5FD__create_first_mpi_type(sf_context, container_depth, source_offset, container_bytes, + row_offset + offset_in_stripe, __sf_source_data_offset, + __sf_datasize, __sf_offset, start_length); + } + /* Move the memory pointer to the starting location + * for next IOC request. + */ + source_offset += start_length; + } + else if (is_last) { /* Last ONLY */ + H5FD__create_final_mpi_type(sf_context, container_depth, source_offset, container_bytes, + row_offset + offset_in_stripe, __sf_source_data_offset, + __sf_datasize, __sf_offset, final_length); + /* Probably not needed... */ + source_offset += stripe_size; + } + else { /* Everything else (uniform) */ + H5FD__create_mpi_uniform_type(sf_context, container_depth, source_offset, container_bytes, + row_offset + offset_in_stripe, __sf_source_data_offset, + __sf_datasize, __sf_offset); + source_offset += stripe_size; + } } - /* Move the memory pointer to the starting location - * for next IOC request. - */ - source_offset += start_length; - } else if (is_last) { /* Last ONLY */ - H5FD__create_final_mpi_type( - sf_context, container_depth, source_offset, container_bytes, - row_offset + offset_in_stripe, __sf_source_data_offset, - __sf_datasize, __sf_offset, final_length); - /* Probably not needed... */ - source_offset += stripe_size; - } else { /* Everything else (uniform) */ - H5FD__create_mpi_uniform_type( - sf_context, container_depth, source_offset, container_bytes, - row_offset + offset_in_stripe, __sf_source_data_offset, - __sf_datasize, __sf_offset); - source_offset += stripe_size; - } - } - k++; - offset_in_stripe += __sf_datasize[0]; - container_id++; + k++; + offset_in_stripe += __sf_datasize[0]; + container_id++; - if (k == container_count) { - k = 0; - offset_in_stripe = 0; - depth = ((row_id_final - container_id) / container_count) + 1; - row_offset += sf_context->sf_blocksize_per_stripe; + if (k == container_count) { + k = 0; + offset_in_stripe = 0; + depth = ((row_id_final - container_id) / container_count) + 1; + row_offset += sf_context->sf_blocksize_per_stripe; + } + } + if (total_bytes != data_size) { + printf("Error: total_bytes != data_size\n"); } - } - if (total_bytes != data_size) { - printf("Error: total_bytes != data_size\n"); - } - *n_containers = container_count; - return depth + 1; + *n_containers = container_count; + return depth + 1; } /* end init__indep_io() */ - /*------------------------------------------------------------------------- * Function: Internal read__independent_async * @@ -685,97 +677,97 @@ int init__indep_io(void *_sf_context, size_t maxdepth, int H5_ATTR_PARALLEL_UNUS #define WORLD_SIZE(ctx) ((ctx)->topology->app_layout->world_size) #define WORLD_RANK(ctx) ((ctx)->topology->app_layout->world_size) -static int read__independent_async(int n_io_concentrators, hid_t context_id, - int64_t offset, int64_t elements, - int H5_ATTR_PARALLEL_UNUSED dtype_extent, - void *data,io_req_t **io_req) { - int status = 0; - int64_t stripe_size, ioc_row, start_id, ioc_start, ioc_offset; - int *io_concentrator = NULL; - io_req_t *sf_io_request = NULL; - int64_t msg[3] = {0, }; - - subfiling_context_t *sf_context = get__subfiling_object(context_id); - assert(sf_context != NULL); - - /* Calculate the IOC that we'll send the IO request to */ - stripe_size = sf_context->sf_stripe_size; - - start_id = offset / stripe_size; - ioc_row = start_id / n_io_concentrators; - ioc_offset = (offset % stripe_size) + (ioc_row * stripe_size); - - ioc_start = start_id % n_io_concentrators; - - io_concentrator = sf_context->topology->io_concentrator; - assert(io_concentrator != NULL); - - /* Make sure that we can return a request structure - * if everything is working correctly - */ - assert(io_req); - - /* Prepare an IO request. - * This gets sent to the ioc identified by the file offset - */ - msg[0] = elements; - msg[1] = ioc_offset; - msg[2] = context_id; +static int +read__independent_async(int n_io_concentrators, hid_t context_id, int64_t offset, int64_t elements, + int H5_ATTR_PARALLEL_UNUSED dtype_extent, void *data, io_req_t **io_req) +{ + int status = 0; + int64_t stripe_size, ioc_row, start_id, ioc_start, ioc_offset; + int * io_concentrator = NULL; + io_req_t *sf_io_request = NULL; + int64_t msg[3] = { + 0, + }; + + subfiling_context_t *sf_context = get__subfiling_object(context_id); + assert(sf_context != NULL); + + /* Calculate the IOC that we'll send the IO request to */ + stripe_size = sf_context->sf_stripe_size; + + start_id = offset / stripe_size; + ioc_row = start_id / n_io_concentrators; + ioc_offset = (offset % stripe_size) + (ioc_row * stripe_size); + + ioc_start = start_id % n_io_concentrators; + + io_concentrator = sf_context->topology->io_concentrator; + assert(io_concentrator != NULL); + + /* Make sure that we can return a request structure + * if everything is working correctly + */ + assert(io_req); + + /* Prepare an IO request. + * This gets sent to the ioc identified by the file offset + */ + msg[0] = elements; + msg[1] = ioc_offset; + msg[2] = context_id; #ifdef VERBOSE - printf("[%s ioc(%ld)] elements=%ld, offset=%ld, file_offset=%ld\n", - __func__, ioc_start, elements, offset, ioc_offset); - fflush(stdout); -#endif - status = MPI_Send(msg, 3, MPI_INT64_T, io_concentrator[ioc_start], READ_INDEP, - sf_context->sf_msg_comm); - - if (status != MPI_SUCCESS) { - int len; - char estring[MPI_MAX_ERROR_STRING]; - MPI_Error_string(status, estring, &len); - printf("[%d] ERROR! MPI_Send request header (%ld) " - "bytes to %d returned an error(%s)\n", - WORLD_RANK(sf_context), sizeof(msg), io_concentrator[ioc_start], estring); + printf("[%s ioc(%ld)] elements=%ld, offset=%ld, file_offset=%ld\n", __func__, ioc_start, elements, offset, + ioc_offset); fflush(stdout); - return -1; - } - - /* At this point in the new implementation, we should queue - * the async recv so that when the top level VFD tells us - * to complete all pending IO requests, we have all the info - * we need to accomplish that. - */ - sf_io_request = (io_req_t *)malloc(sizeof(io_req_t)); - assert(sf_io_request); - - sf_io_request->completion_func.io_args.ioc = (int)ioc_start; - sf_io_request->completion_func.io_args.context_id = context_id; - sf_io_request->completion_func.io_args.offset = offset; - sf_io_request->completion_func.io_args.elements = elements; - sf_io_request->completion_func.io_args.data = data; - sf_io_request->completion_func.io_args.io_req = MPI_REQUEST_NULL; - sf_io_request->completion_func.io_function = async_completion; - sf_io_request->completion_func.pending = 0; - - sf_io_request->prev = sf_io_request->next = NULL; - /* Start the actual data transfer */ - - status = MPI_Irecv(data, (int)elements, MPI_BYTE, io_concentrator[ioc_start], - READ_INDEP_DATA, sf_context->sf_data_comm, - &sf_io_request->completion_func.io_args.io_req); - - if (status == MPI_SUCCESS) { - sf_io_request->completion_func.pending = 1; - *io_req = sf_io_request; - } else { - puts("MPI_Irecv must have failed!"); - free(sf_io_request); - *io_req = NULL; - } - - return status; -} /* end read__independent_async() */ +#endif + status = MPI_Send(msg, 3, MPI_INT64_T, io_concentrator[ioc_start], READ_INDEP, sf_context->sf_msg_comm); + if (status != MPI_SUCCESS) { + int len; + char estring[MPI_MAX_ERROR_STRING]; + MPI_Error_string(status, estring, &len); + printf("[%d] ERROR! MPI_Send request header (%ld) " + "bytes to %d returned an error(%s)\n", + WORLD_RANK(sf_context), sizeof(msg), io_concentrator[ioc_start], estring); + fflush(stdout); + return -1; + } + + /* At this point in the new implementation, we should queue + * the async recv so that when the top level VFD tells us + * to complete all pending IO requests, we have all the info + * we need to accomplish that. + */ + sf_io_request = (io_req_t *)malloc(sizeof(io_req_t)); + assert(sf_io_request); + + sf_io_request->completion_func.io_args.ioc = (int)ioc_start; + sf_io_request->completion_func.io_args.context_id = context_id; + sf_io_request->completion_func.io_args.offset = offset; + sf_io_request->completion_func.io_args.elements = elements; + sf_io_request->completion_func.io_args.data = data; + sf_io_request->completion_func.io_args.io_req = MPI_REQUEST_NULL; + sf_io_request->completion_func.io_function = async_completion; + sf_io_request->completion_func.pending = 0; + + sf_io_request->prev = sf_io_request->next = NULL; + /* Start the actual data transfer */ + + status = MPI_Irecv(data, (int)elements, MPI_BYTE, io_concentrator[ioc_start], READ_INDEP_DATA, + sf_context->sf_data_comm, &sf_io_request->completion_func.io_args.io_req); + + if (status == MPI_SUCCESS) { + sf_io_request->completion_func.pending = 1; + *io_req = sf_io_request; + } + else { + puts("MPI_Irecv must have failed!"); + free(sf_io_request); + *io_req = NULL; + } + + return status; +} /* end read__independent_async() */ /*------------------------------------------------------------------------- * Function: get_ioc_subfile_path @@ -793,23 +785,23 @@ static int read__independent_async(int n_io_concentrators, hid_t context_id, * Return: A full filepath which should be copied, e.g. using strdup *------------------------------------------------------------------------- */ -static char *get_ioc_subfile_path(int ioc, int ioc_count, - subfiling_context_t *sf_context) { - static char filepath[PATH_MAX]; - char *subfile_dir = NULL; - char *prefix = sf_context->subfile_prefix; - - if (prefix != NULL) { - sprintf(filepath, "%s/" SF_NODE_LOCAL_TEMPLATE, prefix, - sf_context->h5_file_id, ioc, ioc_count); - } else { - strcpy(filepath, sf_context->filename); - subfile_dir = strrchr(filepath, '/'); - assert(subfile_dir); - sprintf(subfile_dir + 1, SF_FILENAME_TEMPLATE, sf_context->h5_file_id, ioc, - ioc_count); - } - return filepath; +static char * +get_ioc_subfile_path(int ioc, int ioc_count, subfiling_context_t *sf_context) +{ + static char filepath[PATH_MAX]; + char * subfile_dir = NULL; + char * prefix = sf_context->subfile_prefix; + + if (prefix != NULL) { + sprintf(filepath, "%s/" SF_NODE_LOCAL_TEMPLATE, prefix, sf_context->h5_file_id, ioc, ioc_count); + } + else { + strcpy(filepath, sf_context->h5_filename); + subfile_dir = strrchr(filepath, '/'); + assert(subfile_dir); + sprintf(subfile_dir + 1, SF_FILENAME_TEMPLATE, sf_context->h5_file_id, ioc, ioc_count); + } + return filepath; } /* end get_ioc_subfile_path() */ /*------------------------------------------------------------------------- @@ -831,10 +823,12 @@ static char *get_ioc_subfile_path(int ioc, int ioc_count, * values (-1) indicates an error. *------------------------------------------------------------------------- */ -static int progress_this_pending_io(io_req_t *this_req) { - assert(this_req); - assert(this_req->completion_func.io_function); - return (*this_req->completion_func.io_function)(&this_req->completion_func); +static int +progress_this_pending_io(io_req_t *this_req) +{ + assert(this_req); + assert(this_req->completion_func.io_function); + return (*this_req->completion_func.io_function)(&this_req->completion_func); } /*------------------------------------------------------------------------- @@ -848,25 +842,26 @@ static int progress_this_pending_io(io_req_t *this_req) { * values (-1) indicates an error. *------------------------------------------------------------------------- */ -static int write_data(io_func_t *this_func) { - int ioc, status; - int64_t elements; - void *data; - int *io_concentrator = NULL; - subfiling_context_t *sf_context = NULL; - assert(this_func); +static int +write_data(io_func_t *this_func) +{ + int ioc, status; + int64_t elements; + void * data; + int * io_concentrator = NULL; + subfiling_context_t *sf_context = NULL; + assert(this_func); - sf_context = get__subfiling_object(this_func->io_args.context_id); + sf_context = get__subfiling_object(this_func->io_args.context_id); - assert(sf_context); + assert(sf_context); - io_concentrator = sf_context->topology->io_concentrator; - ioc = this_func->io_args.ioc; + io_concentrator = sf_context->topology->io_concentrator; + ioc = this_func->io_args.ioc; - status = MPI_Isend(data, (int)elements, MPI_BYTE, io_concentrator[ioc], - WRITE_INDEP_DATA, sf_context->sf_data_comm, - &this_func->io_args.io_req); - return status; + status = MPI_Isend(data, (int)elements, MPI_BYTE, io_concentrator[ioc], WRITE_INDEP_DATA, + sf_context->sf_data_comm, &this_func->io_args.io_req); + return status; } /*------------------------------------------------------------------------- @@ -888,43 +883,44 @@ static int write_data(io_func_t *this_func) { * values (-1) indicates an error. *------------------------------------------------------------------------- */ -static int async_completion(void *arg) { - struct async_arg { - int n_reqs; - MPI_Request *sf_reqs; - } *in_progress = (struct async_arg *)arg; - - assert(arg); - int status, errors = 0; - int count = in_progress->n_reqs; - int n_waiting = count; - int indices[count]; - MPI_Status stats[count]; - useconds_t delay = 5; - - while (n_waiting) { - int i, ready = 0; - status = MPI_Testsome(count, in_progress->sf_reqs, &ready, indices, stats); - if (status != MPI_SUCCESS) { - int len; - char estring[MPI_MAX_ERROR_STRING]; - MPI_Error_string(status, estring, &len); - printf("[%s] MPI_ERROR! MPI_Testsome returned an error(%s)\n", __func__, - estring); - fflush(stdout); - errors++; - return -1; - } +static int +async_completion(void *arg) +{ + struct async_arg { + int n_reqs; + MPI_Request *sf_reqs; + } *in_progress = (struct async_arg *)arg; + + assert(arg); + int status, errors = 0; + int count = in_progress->n_reqs; + int n_waiting = count; + int indices[count]; + MPI_Status stats[count]; + useconds_t delay = 5; + + while (n_waiting) { + int i, ready = 0; + status = MPI_Testsome(count, in_progress->sf_reqs, &ready, indices, stats); + if (status != MPI_SUCCESS) { + int len; + char estring[MPI_MAX_ERROR_STRING]; + MPI_Error_string(status, estring, &len); + printf("[%s] MPI_ERROR! MPI_Testsome returned an error(%s)\n", __func__, estring); + fflush(stdout); + errors++; + return -1; + } - if (ready == 0) { - usleep(delay); - } + if (ready == 0) { + usleep(delay); + } - for (i = 0; i < ready; i++) { - n_waiting--; + for (i = 0; i < ready; i++) { + n_waiting--; + } } - } - return errors; + return errors; } /*------------------------------------------------------------------------- @@ -960,137 +956,138 @@ static int async_completion(void *arg) { * Changes: Initial Version/None. *------------------------------------------------------------------------- */ -static int write__independent_async(int n_io_concentrators, hid_t context_id, - int64_t offset, int64_t elements, - int H5_ATTR_PARALLEL_UNUSED dtype_extent, - const void *data, io_req_t **io_req) { - - int ack = 0, active_sends = 0, n_waiting = 0, status = 0; - int64_t stripe_size, ioc_row, start_id, ioc_start, ioc_offset; - int *io_concentrator = NULL; - io_req_t *sf_io_request = NULL; - MPI_Request ackrequest; - int64_t msg[3] = {0, }; - - subfiling_context_t *sf_context = get__subfiling_object(context_id); - assert(sf_context != NULL); - - /* Calculate the IOC that we'll send the IO request to */ - stripe_size = sf_context->sf_stripe_size; - - start_id = offset / stripe_size; - ioc_row = start_id / n_io_concentrators; - ioc_offset = (offset % stripe_size) + (ioc_row * stripe_size); - ioc_start = start_id % n_io_concentrators; - - io_concentrator = sf_context->topology->io_concentrator; - assert(io_concentrator != NULL); - - /* Make sure that we can return a request structure - * if everything is working correctly - */ - assert(io_req); - - - /* Prepare an IO request. - * This gets sent to the ioc identified by the file offset. - * (see above: Calculate the IOC)) - */ - msg[0] = elements; - msg[1] = ioc_offset; - msg[2] = context_id; +static int +write__independent_async(int n_io_concentrators, hid_t context_id, int64_t offset, int64_t elements, + int H5_ATTR_PARALLEL_UNUSED dtype_extent, const void *data, io_req_t **io_req) +{ + + int ack = 0, active_sends = 0, n_waiting = 0, status = 0; + int64_t stripe_size, ioc_row, start_id, ioc_start, ioc_offset; + int * io_concentrator = NULL; + io_req_t * sf_io_request = NULL; + MPI_Request ackrequest; + int64_t msg[3] = { + 0, + }; + + subfiling_context_t *sf_context = get__subfiling_object(context_id); + assert(sf_context != NULL); + + /* Calculate the IOC that we'll send the IO request to */ + stripe_size = sf_context->sf_stripe_size; + + start_id = offset / stripe_size; + ioc_row = start_id / n_io_concentrators; + ioc_offset = (offset % stripe_size) + (ioc_row * stripe_size); + ioc_start = start_id % n_io_concentrators; + + io_concentrator = sf_context->topology->io_concentrator; + assert(io_concentrator != NULL); + + /* Make sure that we can return a request structure + * if everything is working correctly + */ + assert(io_req); + + /* Prepare an IO request. + * This gets sent to the ioc identified by the file offset. + * (see above: Calculate the IOC)) + */ + msg[0] = elements; + msg[1] = ioc_offset; + msg[2] = context_id; #ifdef VERBOSE - printf("[%s ioc(%ld)] elements=%ld, offset=%ld, file_offset=%ld\n", - __func__, ioc_start, elements, offset, ioc_offset); - fflush(stdout); -#endif - status = MPI_Send(msg, 3, MPI_INT64_T, io_concentrator[ioc_start], - WRITE_INDEP, sf_context->sf_msg_comm); - - if (status != MPI_SUCCESS) { - int len; - char estring[MPI_MAX_ERROR_STRING]; - MPI_Error_string(status, estring, &len); - printf("[%d] ERROR! MPI_Send of %ld bytes to %d returned an " - "error(%s)\n", - WORLD_RANK(sf_context), sizeof(msg), io_concentrator[ioc_start], estring); - fflush(stdout); - return -1; - } else - active_sends++; - /* - * We wait for memory to be allocated on the target IOC so that we can - * start sending user data. Once memory is allocated, we will receive - * an ACK (or NACK) message from the IOC to allow us to proceed. - */ - status = MPI_Irecv(&ack, 1, MPI_INT, io_concentrator[ioc_start], - WRITE_INDEP_ACK, sf_context->sf_data_comm, &ackrequest); - - if (status != MPI_SUCCESS) { - printf("[%d %s] MPI_Irecv failed\n", WORLD_RANK(sf_context), __func__); + printf("[%s ioc(%ld)] elements=%ld, offset=%ld, file_offset=%ld\n", __func__, ioc_start, elements, offset, + ioc_offset); fflush(stdout); - return -1; - } +#endif + status = MPI_Send(msg, 3, MPI_INT64_T, io_concentrator[ioc_start], WRITE_INDEP, sf_context->sf_msg_comm); + + if (status != MPI_SUCCESS) { + int len; + char estring[MPI_MAX_ERROR_STRING]; + MPI_Error_string(status, estring, &len); + printf("[%d] ERROR! MPI_Send of %ld bytes to %d returned an " + "error(%s)\n", + WORLD_RANK(sf_context), sizeof(msg), io_concentrator[ioc_start], estring); + fflush(stdout); + return -1; + } + else + active_sends++; + /* + * We wait for memory to be allocated on the target IOC so that we can + * start sending user data. Once memory is allocated, we will receive + * an ACK (or NACK) message from the IOC to allow us to proceed. + */ + status = MPI_Irecv(&ack, 1, MPI_INT, io_concentrator[ioc_start], WRITE_INDEP_ACK, + sf_context->sf_data_comm, &ackrequest); - n_waiting = active_sends; + if (status != MPI_SUCCESS) { + printf("[%d %s] MPI_Irecv failed\n", WORLD_RANK(sf_context), __func__); + fflush(stdout); + return -1; + } - while (n_waiting) { - int flag = 0; - status = MPI_Test(&ackrequest, &flag, MPI_STATUS_IGNORE); - if (status == MPI_SUCCESS) { - if (flag == 0) - usleep(0); - else { - n_waiting--; - if (ack == 0) { /* NACK */ - printf("%s - Received NACK!\n", __func__); + n_waiting = active_sends; + + while (n_waiting) { + int flag = 0; + status = MPI_Test(&ackrequest, &flag, MPI_STATUS_IGNORE); + if (status == MPI_SUCCESS) { + if (flag == 0) + usleep(0); + else { + n_waiting--; + if (ack == 0) { /* NACK */ + printf("%s - Received NACK!\n", __func__); + } + } } - } } - } - - /* At this point in the new implementation, we should queue - * the async write so that when the top level VFD tells us - * to complete all pending IO requests, we have all the info - * we need to accomplish that. - */ - sf_io_request = (io_req_t *)malloc(sizeof(io_req_t)); - assert(sf_io_request); - - sf_io_request->completion_func.io_args.ioc = (int)ioc_start; - sf_io_request->completion_func.io_args.context_id = context_id; - sf_io_request->completion_func.io_args.offset = offset; - sf_io_request->completion_func.io_args.elements = elements; - sf_io_request->completion_func.io_args.data = cast_to_void(data); - sf_io_request->completion_func.io_args.io_req = MPI_REQUEST_NULL; - sf_io_request->completion_func.io_function = async_completion; - sf_io_request->completion_func.pending = 0; - - sf_io_request->prev = sf_io_request->next = NULL; - /* Start the actual data transfer */ - - status = MPI_Isend(data, (int)elements, MPI_BYTE, io_concentrator[ioc_start], - WRITE_INDEP_DATA, sf_context->sf_data_comm, - &sf_io_request->completion_func.io_args.io_req); - - /* When we actually have the async IO support, - * the request should be queued before we - * return to the caller. - * Having queued the IO operation, we might want to - * get additional work started before allowing the - * queued IO requests to make further progress and/or - * to complete, so we just return to the caller. - */ - - if (status == MPI_SUCCESS) { - sf_io_request->completion_func.pending = 1; - *io_req = sf_io_request; - } else { - puts("MPI_Isend must have failed!"); - free(sf_io_request); - *io_req = NULL; - } - return status; + + /* At this point in the new implementation, we should queue + * the async write so that when the top level VFD tells us + * to complete all pending IO requests, we have all the info + * we need to accomplish that. + */ + sf_io_request = (io_req_t *)malloc(sizeof(io_req_t)); + assert(sf_io_request); + + sf_io_request->completion_func.io_args.ioc = (int)ioc_start; + sf_io_request->completion_func.io_args.context_id = context_id; + sf_io_request->completion_func.io_args.offset = offset; + sf_io_request->completion_func.io_args.elements = elements; + sf_io_request->completion_func.io_args.data = cast_to_void(data); + sf_io_request->completion_func.io_args.io_req = MPI_REQUEST_NULL; + sf_io_request->completion_func.io_function = async_completion; + sf_io_request->completion_func.pending = 0; + + sf_io_request->prev = sf_io_request->next = NULL; + /* Start the actual data transfer */ + + status = MPI_Isend(data, (int)elements, MPI_BYTE, io_concentrator[ioc_start], WRITE_INDEP_DATA, + sf_context->sf_data_comm, &sf_io_request->completion_func.io_args.io_req); + + /* When we actually have the async IO support, + * the request should be queued before we + * return to the caller. + * Having queued the IO operation, we might want to + * get additional work started before allowing the + * queued IO requests to make further progress and/or + * to complete, so we just return to the caller. + */ + + if (status == MPI_SUCCESS) { + sf_io_request->completion_func.pending = 1; + *io_req = sf_io_request; + } + else { + puts("MPI_Isend must have failed!"); + free(sf_io_request); + *io_req = NULL; + } + return status; } /* end write__independent_async() */ /* @@ -1108,81 +1105,82 @@ static int write__independent_async(int n_io_concentrators, hid_t context_id, * * Return: SUCCEED if no errors, FAIL otherwise. */ -herr_t H5FD__write_vector_internal(hid_t h5_fid, hssize_t count, - haddr_t addrs[], size_t sizes[], - const void *bufs[] /* data_in */) { - herr_t ret_value = SUCCEED; - hssize_t status = 0, k = 0; - hid_t sf_context_id = fid_map_to_context((uint64_t)h5_fid); - subfiling_context_t *sf_context = NULL; - io_req_t **sf_async_reqs = NULL; - MPI_Request *active_reqs = NULL; - struct __mpi_req { - int n_reqs; - MPI_Request *active_reqs; - } *mpi_reqs = NULL; - - sf_context = get__subfiling_object(sf_context_id); - assert(sf_context != NULL); - - active_reqs = (MPI_Request *)calloc((size_t)(count+2), sizeof(MPI_Request)); - assert(active_reqs); - - sf_async_reqs = (io_req_t **)calloc((size_t)count, sizeof(void *)); - assert(sf_async_reqs); - - /* - * Note: We allocated extra space in the active_requests (above). - * The extra should be enough for an integer plus a pointer. - */ - mpi_reqs = (struct __mpi_req *)&active_reqs[count]; - mpi_reqs->n_reqs = (int)count; - mpi_reqs->active_reqs = active_reqs; - - /* Each pass thru the following should queue an MPI write - * to a new IOC. Both the IOC selection and offset within the - * particular subfile are based on the combinatation of striping - * factors and the virtual file offset (addrs[k]). - */ - for (k = 0; k < count; k++) { - if (sizes[k] == 0) { - puts("Something wrong with the size argument: size is 0!"); - fflush(stdout); - } - status = write__independent_async( - sf_context->topology->n_io_concentrators, sf_context_id, - (int64_t)addrs[k],(int64_t)sizes[k], 1, bufs[k], &sf_async_reqs[k]); - if (status < 0) { - printf("%s - encountered an internal error!\n", __func__); - goto errors; - } else { - mpi_reqs->active_reqs[k] = - sf_async_reqs[k]->completion_func.io_args.io_req; - } - } +herr_t +H5FD__write_vector_internal(hid_t h5_fid, hssize_t count, haddr_t addrs[], size_t sizes[], + const void *bufs[] /* data_in */) +{ + herr_t ret_value = SUCCEED; + hssize_t status = 0, k = 0; + hid_t sf_context_id = fid_map_to_context((uint64_t)h5_fid); + subfiling_context_t *sf_context = NULL; + io_req_t ** sf_async_reqs = NULL; + MPI_Request * active_reqs = NULL; + struct __mpi_req { + int n_reqs; + MPI_Request *active_reqs; + } *mpi_reqs = NULL; + + sf_context = get__subfiling_object(sf_context_id); + assert(sf_context != NULL); - /* Here, we should have queued 'count' async requests. - * We can can now try to complete those before returning - * to the caller for the next set of IO operations. - */ - if (sf_async_reqs[0]->completion_func.io_function) - ret_value = (*sf_async_reqs[0]->completion_func.io_function)(mpi_reqs); + active_reqs = (MPI_Request *)calloc((size_t)(count + 2), sizeof(MPI_Request)); + assert(active_reqs); - if (active_reqs) - free(active_reqs); + sf_async_reqs = (io_req_t **)calloc((size_t)count, sizeof(void *)); + assert(sf_async_reqs); - if (sf_async_reqs) { + /* + * Note: We allocated extra space in the active_requests (above). + * The extra should be enough for an integer plus a pointer. + */ + mpi_reqs = (struct __mpi_req *)&active_reqs[count]; + mpi_reqs->n_reqs = (int)count; + mpi_reqs->active_reqs = active_reqs; + + /* Each pass thru the following should queue an MPI write + * to a new IOC. Both the IOC selection and offset within the + * particular subfile are based on the combinatation of striping + * factors and the virtual file offset (addrs[k]). + */ for (k = 0; k < count; k++) { - if (sf_async_reqs[k]) { - free(sf_async_reqs[k]); - } + if (sizes[k] == 0) { + puts("Something wrong with the size argument: size is 0!"); + fflush(stdout); + } + status = + write__independent_async(sf_context->topology->n_io_concentrators, sf_context_id, + (int64_t)addrs[k], (int64_t)sizes[k], 1, bufs[k], &sf_async_reqs[k]); + if (status < 0) { + printf("%s - encountered an internal error!\n", __func__); + goto errors; + } + else { + mpi_reqs->active_reqs[k] = sf_async_reqs[k]->completion_func.io_args.io_req; + } + } + + /* Here, we should have queued 'count' async requests. + * We can can now try to complete those before returning + * to the caller for the next set of IO operations. + */ + if (sf_async_reqs[0]->completion_func.io_function) + ret_value = (*sf_async_reqs[0]->completion_func.io_function)(mpi_reqs); + + if (active_reqs) + free(active_reqs); + + if (sf_async_reqs) { + for (k = 0; k < count; k++) { + if (sf_async_reqs[k]) { + free(sf_async_reqs[k]); + } + } + free(sf_async_reqs); } - free(sf_async_reqs); - } - return ret_value; + return ret_value; errors: - return FAIL; + return FAIL; } /* @@ -1190,91 +1188,95 @@ errors: * The H5FD__ioc_read_vector VFD call included additional 'hid_t dxpl' * and 'H5FD_mem_t types[]'. These are now removed. */ -herr_t H5FD__read_vector_internal(hid_t h5_fid, hssize_t count, haddr_t addrs[], - size_t sizes[], - void *bufs[] /* data_out */) { - herr_t ret_value = SUCCEED; - hssize_t status = 0, k = 0; - hid_t sf_context_id = fid_map_to_context((uint64_t)h5_fid); - subfiling_context_t *sf_context = NULL; - io_req_t **sf_async_reqs = NULL; - MPI_Request *active_reqs = NULL; - struct __mpi_req { - int n_reqs; - MPI_Request *active_reqs; - } *mpi_reqs = NULL; - - sf_context = get__subfiling_object(sf_context_id); - assert(sf_context != NULL); - - active_reqs = (MPI_Request *)calloc((size_t)(count+2), sizeof(MPI_Request)); - assert(active_reqs); - - sf_async_reqs = (io_req_t **)calloc((size_t)count, sizeof(void *)); - assert(sf_async_reqs); - - /* - * Note: We allocated extra space in the active_requests (above). - * The extra should be enough for an integer plus a pointer. - */ - mpi_reqs = (struct __mpi_req *)&active_reqs[count]; - mpi_reqs->n_reqs = (int)count; - mpi_reqs->active_reqs = active_reqs; - - for (k = 0; k < count; k++) { - status = read__independent_async( - sf_context->topology->n_io_concentrators, sf_context_id, - (int64_t)addrs[k], (int64_t)sizes[k], 1, bufs[k], &sf_async_reqs[k]); - if (status < 0) { - printf("%s - encountered an internal error!\n", __func__); - goto errors; - } else { - mpi_reqs->active_reqs[k] = - sf_async_reqs[k]->completion_func.io_args.io_req; - } - } - /* Here, we should have queued 'count' async requests - * (one to each required IOC). - * - * We can can now try to complete those before returning - * to the caller for the next set of IO operations. - */ - if (sf_async_reqs[0]->completion_func.io_function) - ret_value = (*sf_async_reqs[0]->completion_func.io_function)(mpi_reqs); - - if (active_reqs) - free(active_reqs); - - if (sf_async_reqs) { +herr_t +H5FD__read_vector_internal(hid_t h5_fid, hssize_t count, haddr_t addrs[], size_t sizes[], + void *bufs[] /* data_out */) +{ + herr_t ret_value = SUCCEED; + hssize_t status = 0, k = 0; + hid_t sf_context_id = fid_map_to_context((uint64_t)h5_fid); + subfiling_context_t *sf_context = NULL; + io_req_t ** sf_async_reqs = NULL; + MPI_Request * active_reqs = NULL; + struct __mpi_req { + int n_reqs; + MPI_Request *active_reqs; + } *mpi_reqs = NULL; + + sf_context = get__subfiling_object(sf_context_id); + assert(sf_context != NULL); + + active_reqs = (MPI_Request *)calloc((size_t)(count + 2), sizeof(MPI_Request)); + assert(active_reqs); + + sf_async_reqs = (io_req_t **)calloc((size_t)count, sizeof(void *)); + assert(sf_async_reqs); + + /* + * Note: We allocated extra space in the active_requests (above). + * The extra should be enough for an integer plus a pointer. + */ + mpi_reqs = (struct __mpi_req *)&active_reqs[count]; + mpi_reqs->n_reqs = (int)count; + mpi_reqs->active_reqs = active_reqs; + for (k = 0; k < count; k++) { - if (sf_async_reqs[k]) { - free(sf_async_reqs[k]); - } + status = read__independent_async(sf_context->topology->n_io_concentrators, sf_context_id, + (int64_t)addrs[k], (int64_t)sizes[k], 1, bufs[k], &sf_async_reqs[k]); + if (status < 0) { + printf("%s - encountered an internal error!\n", __func__); + goto errors; + } + else { + mpi_reqs->active_reqs[k] = sf_async_reqs[k]->completion_func.io_args.io_req; + } } - free(sf_async_reqs); - } - return ret_value; + /* Here, we should have queued 'count' async requests + * (one to each required IOC). + * + * We can can now try to complete those before returning + * to the caller for the next set of IO operations. + */ + if (sf_async_reqs[0]->completion_func.io_function) + ret_value = (*sf_async_reqs[0]->completion_func.io_function)(mpi_reqs); + + if (active_reqs) + free(active_reqs); + + if (sf_async_reqs) { + for (k = 0; k < count; k++) { + if (sf_async_reqs[k]) { + free(sf_async_reqs[k]); + } + } + free(sf_async_reqs); + } + return ret_value; errors: - return FAIL; + return FAIL; } -int sf_truncate(hid_t h5_fid, haddr_t H5_ATTR_PARALLEL_UNUSED addr) { - hid_t sf_context_id = fid_map_to_context((uint64_t)h5_fid); - subfiling_context_t *sf_context = get__subfiling_object(sf_context_id); +int +sf_truncate(hid_t h5_fid, haddr_t H5_ATTR_PARALLEL_UNUSED addr) +{ + hid_t sf_context_id = fid_map_to_context((uint64_t)h5_fid); + subfiling_context_t *sf_context = get__subfiling_object(sf_context_id); - assert(sf_context != NULL); - return 0; + assert(sf_context != NULL); + return 0; } -int sf_shutdown_local_ioc(hid_t fid) { - hid_t context_id = fid_map_to_context((uint64_t)fid); - subfiling_context_t *sf_context = get__subfiling_object(context_id); - assert(sf_context != NULL); - if (sf_context->topology->rank_is_ioc) { - sf_shutdown_flag = 1; - } - return 0; +int +sf_shutdown_local_ioc(hid_t fid) +{ + hid_t context_id = fid_map_to_context((uint64_t)fid); + subfiling_context_t *sf_context = get__subfiling_object(context_id); + assert(sf_context != NULL); + if (sf_context->topology->rank_is_ioc) { + sf_shutdown_flag = 1; + } + return 0; } /*------------------------------------------------------------------------- @@ -1339,119 +1341,121 @@ int sf_shutdown_local_ioc(hid_t fid) { * Changes: Initial Version/None. *------------------------------------------------------------------------- */ -int ioc_main(int64_t context_id) { - int subfile_rank; - int flag, ret; - int max_work_depth; - int shutdown_requested; - MPI_Status status, msg_status; - sf_work_request_t *incoming_requests = NULL; - useconds_t delay = 20; - subfiling_context_t *context = get__subfiling_object(context_id); - double queue_start_time; - - assert(context != NULL); - /* We can't have opened any files at this point.. - * The file open approach has changed so that the normal - * application rank (hosting this thread) does the file open. - * We can simply utilize the file descriptor (which should now - * represent an open file). - */ - - subfile_rank = context->sf_group_rank; - - if (request_count_per_rank == NULL) { - request_count_per_rank = (int *)calloc((size_t)WORLD_SIZE(context), sizeof(int)); - assert(request_count_per_rank != NULL); - } - - max_work_depth = MAX(8, WORLD_SIZE(context) * MAX_WORK_PER_RANK); - incoming_requests = (sf_work_request_t *)calloc((size_t)(max_work_depth + 1), - sizeof(sf_work_request_t)); - - /* Validate that the allocation succeeded */ - assert(incoming_requests != NULL); - - /* Initialize atomic vars */ - atomic_init(&sf_workinprogress, 0); - atomic_init(&sf_work_pending, 0); - atomic_init(&sf_file_close_count, 0); - atomic_init(&sf_file_refcount, 0); - atomic_init(&sf_ioc_fini_refcount, 0); - atomic_init(&sf_ioc_ready, 1); - shutdown_requested = 0; - - while (!shutdown_requested || sf_work_pending) { - flag = 0; - ret = MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, context->sf_msg_comm, &flag, - &status); - if ((ret == MPI_SUCCESS) && (flag != 0)) { - sf_work_request_t *msg = NULL; - int count; - int index = 0; - int request_size = (int)sizeof(sf_work_request_t); - int source = status.MPI_SOURCE; - int tag = status.MPI_TAG; - - MPI_Get_count(&status, MPI_BYTE, &count); - if (count > request_size) { - msg = (sf_work_request_t *)malloc((size_t)count); - ret = MPI_Recv(msg, count, MPI_BYTE, source, tag, context->sf_msg_comm, - &msg_status); - } else { - index = atomic_load(&sf_workinprogress); - ret = MPI_Recv(&incoming_requests[index], count, MPI_BYTE, source, tag, - context->sf_msg_comm, &msg_status); - if (MPI_SUCCESS == ret) { - int howmany = 0; - MPI_Get_count(&msg_status, MPI_BYTE, &howmany); - if (howmany != count) { - printf("%s: MPI_Recv completed %d bytes of %d\n", __func__, howmany, - count); - fflush(stdout); - } +int +ioc_main(int64_t context_id) +{ + int subfile_rank; + int flag, ret; + int max_work_depth; + int shutdown_requested; + MPI_Status status, msg_status; + sf_work_request_t * incoming_requests = NULL; + useconds_t delay = 20; + subfiling_context_t *context = get__subfiling_object(context_id); + double queue_start_time; + + assert(context != NULL); + /* We can't have opened any files at this point.. + * The file open approach has changed so that the normal + * application rank (hosting this thread) does the file open. + * We can simply utilize the file descriptor (which should now + * represent an open file). + */ + + subfile_rank = context->sf_group_rank; + + if (request_count_per_rank == NULL) { + request_count_per_rank = (int *)calloc((size_t)WORLD_SIZE(context), sizeof(int)); + assert(request_count_per_rank != NULL); + } + + max_work_depth = MAX(8, WORLD_SIZE(context) * MAX_WORK_PER_RANK); + incoming_requests = (sf_work_request_t *)calloc((size_t)(max_work_depth + 1), sizeof(sf_work_request_t)); + + /* Validate that the allocation succeeded */ + assert(incoming_requests != NULL); + + /* Initialize atomic vars */ + atomic_init(&sf_workinprogress, 0); + atomic_init(&sf_work_pending, 0); + atomic_init(&sf_file_close_count, 0); + atomic_init(&sf_file_refcount, 0); + atomic_init(&sf_ioc_fini_refcount, 0); + atomic_init(&sf_ioc_ready, 1); + shutdown_requested = 0; + + while (!shutdown_requested || sf_work_pending) { + flag = 0; + ret = MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, context->sf_msg_comm, &flag, &status); + if ((ret == MPI_SUCCESS) && (flag != 0)) { + sf_work_request_t *msg = NULL; + int count; + int index = 0; + int request_size = (int)sizeof(sf_work_request_t); + int source = status.MPI_SOURCE; + int tag = status.MPI_TAG; + + MPI_Get_count(&status, MPI_BYTE, &count); + if (count > request_size) { + msg = (sf_work_request_t *)malloc((size_t)count); + ret = MPI_Recv(msg, count, MPI_BYTE, source, tag, context->sf_msg_comm, &msg_status); + } + else { + index = atomic_load(&sf_workinprogress); + ret = MPI_Recv(&incoming_requests[index], count, MPI_BYTE, source, tag, context->sf_msg_comm, + &msg_status); + if (MPI_SUCCESS == ret) { + int howmany = 0; + MPI_Get_count(&msg_status, MPI_BYTE, &howmany); + if (howmany != count) { + printf("%s: MPI_Recv completed %d bytes of %d\n", __func__, howmany, count); + fflush(stdout); + } + } + } + queue_start_time = MPI_Wtime(); + if (ret == MPI_SUCCESS) { + if (msg) { + printf("%s: non-std msg=(%p) from %d\n", __func__, (void *)msg, source); + fflush(stdout); + + msg->source = source; + msg->subfile_rank = subfile_rank; + msg->context_id = context->sf_context_id; + msg->start_time = queue_start_time; + tpool_add_work(msg); + } + else { + incoming_requests[index].tag = tag; + incoming_requests[index].source = source; + incoming_requests[index].subfile_rank = subfile_rank; + incoming_requests[index].start_time = queue_start_time; + incoming_requests[index].buffer = NULL; + incoming_requests[index].completed = 0; + tpool_add_work(&incoming_requests[index]); + if (index == max_work_depth - 1) { + atomic_init(&sf_workinprogress, 0); + } + else { + atomic_fetch_add(&sf_workinprogress, 1); // atomic + } + } + } } - } - queue_start_time = MPI_Wtime(); - if (ret == MPI_SUCCESS) { - if (msg) { - printf("%s: non-std msg=(%p) from %d\n", __func__, (void *)msg, source); - fflush(stdout); - - msg->source = source; - msg->subfile_rank = subfile_rank; - msg->context_id = context->sf_context_id; - msg->start_time = queue_start_time; - tpool_add_work(msg); - } else { - incoming_requests[index].tag = tag; - incoming_requests[index].source = source; - incoming_requests[index].subfile_rank = subfile_rank; - incoming_requests[index].start_time = queue_start_time; - incoming_requests[index].buffer = NULL; - incoming_requests[index].completed = 0; - tpool_add_work(&incoming_requests[index]); - if (index == max_work_depth - 1) { - atomic_init(&sf_workinprogress, 0); - } else { - atomic_fetch_add(&sf_workinprogress, 1); // atomic - } + else { + usleep(delay); } - } - } else { - usleep(delay); + shutdown_requested = sf_shutdown_flag; } - shutdown_requested = sf_shutdown_flag; - } - if (incoming_requests) { - free(incoming_requests); - } + if (incoming_requests) { + free(incoming_requests); + } - /* Reset the shutdown flag */ - sf_shutdown_flag = 0; + /* Reset the shutdown flag */ + sf_shutdown_flag = 0; - return 0; + return 0; } /* @@ -1460,33 +1464,35 @@ Private helper functions ========================================= */ -static int send_ack__(int target, int subfile_rank, int tag, MPI_Comm comm) { - int ack = 1; - int ret = MPI_Send(&ack, 1, MPI_INT, target, tag, comm); +static int +send_ack__(int target, int subfile_rank, int tag, MPI_Comm comm) +{ + int ack = 1; + int ret = MPI_Send(&ack, 1, MPI_INT, target, tag, comm); #ifndef NDEBUG - if (sf_verbose_flag) { - if (sf_logfile) { - fprintf(sf_logfile, "[ioc(%d): Sending ACK to MPI_rank(%d)\n", - subfile_rank, target); + if (sf_verbose_flag) { + if (sf_logfile) { + fprintf(sf_logfile, "[ioc(%d): Sending ACK to MPI_rank(%d)\n", subfile_rank, target); + } } - } #endif - return ret; + return ret; } -static int send_nack__(int target, int subfile_rank, int tag, MPI_Comm comm) { - int nack = 0; - int ret = MPI_Send(&nack, 1, MPI_INT, target, tag, comm); +static int +send_nack__(int target, int subfile_rank, int tag, MPI_Comm comm) +{ + int nack = 0; + int ret = MPI_Send(&nack, 1, MPI_INT, target, tag, comm); #ifndef NDEBUG - if (sf_verbose_flag) { - if (sf_logfile) { - fprintf(sf_logfile, "[ioc(%d): Sending NACK to MPI_rank(%d)\n", - subfile_rank, target); + if (sf_verbose_flag) { + if (sf_logfile) { + fprintf(sf_logfile, "[ioc(%d): Sending NACK to MPI_rank(%d)\n", subfile_rank, target); + } } - } #endif - return ret; + return ret; } /* @@ -1517,106 +1523,115 @@ from the thread pool threads... * *------------------------------------------------------------------------- */ -int queue_write_indep(sf_work_request_t *msg, int subfile_rank, int source, - MPI_Comm comm) { - int fd; - char *recv_buffer = NULL; - int ret = MPI_SUCCESS; - MPI_Status msg_status; - int64_t data_size = msg->header[0]; - int64_t file_offset = msg->header[1]; - int64_t file_context_id = msg->header[2]; - double t_start, t_end; - double t_write, t_wait, t_queue_delay; - subfiling_context_t *sf_context = get__subfiling_object(file_context_id); - assert(sf_context != NULL); - - /* flag that we've attempted to write data to the file */ - sf_context->sf_write_count++; - /* For debugging performance */ - sf_write_ops++; - - t_start = MPI_Wtime(); - t_queue_delay = t_start - msg->start_time; +int +queue_write_indep(sf_work_request_t *msg, int subfile_rank, int source, MPI_Comm comm) +{ + int fd; + char * recv_buffer = NULL; + int ret = MPI_SUCCESS; + MPI_Status msg_status; + int64_t data_size = msg->header[0]; + int64_t file_offset = msg->header[1]; + int64_t file_context_id = msg->header[2]; + double t_start, t_end; + double t_write, t_wait, t_queue_delay; + subfiling_context_t *sf_context = get__subfiling_object(file_context_id); + int64_t stripe_id = file_offset + data_size; + haddr_t sf_eof; + assert(sf_context != NULL); + + sf_eof = (haddr_t)(stripe_id % sf_context->sf_stripe_size); + stripe_id /= sf_context->sf_stripe_size; + sf_eof += (haddr_t)((stripe_id * sf_context->sf_blocksize_per_stripe) + sf_context->sf_base_addr); + + /* flag that we've attempted to write data to the file */ + sf_context->sf_write_count++; + /* For debugging performance */ + sf_write_ops++; + + t_start = MPI_Wtime(); + t_queue_delay = t_start - msg->start_time; #ifndef NDEBUG - if (sf_verbose_flag) { - if (sf_logfile) { - fprintf(sf_logfile, - "[ioc(%d) %s]: msg from %d: datasize=%ld\toffset=%ld, " - "queue_delay = %lf seconds\n", - subfile_rank, __func__, source, data_size, file_offset, - t_queue_delay); + if (sf_verbose_flag) { + if (sf_logfile) { + fprintf(sf_logfile, + "[ioc(%d) %s]: msg from %d: datasize=%ld\toffset=%ld, " + "queue_delay = %lf seconds\n", + subfile_rank, __func__, source, data_size, file_offset, t_queue_delay); + } } - } #endif - if (recv_buffer == NULL) { - if ((recv_buffer = (char *)malloc((size_t)data_size)) == NULL) { - perror("malloc"); - send_nack__(source, subfile_rank, WRITE_INDEP_ACK, comm); - return -1; + if (recv_buffer == NULL) { + if ((recv_buffer = (char *)malloc((size_t)data_size)) == NULL) { + perror("malloc"); + send_nack__(source, subfile_rank, WRITE_INDEP_ACK, comm); + return -1; + } } - } - send_ack__(source, subfile_rank, WRITE_INDEP_ACK, comm); - ret = MPI_Recv(recv_buffer, (int)data_size, MPI_BYTE, source, - WRITE_INDEP_DATA, comm, &msg_status); + send_ack__(source, subfile_rank, WRITE_INDEP_ACK, comm); + ret = MPI_Recv(recv_buffer, (int)data_size, MPI_BYTE, source, WRITE_INDEP_DATA, comm, &msg_status); - t_end = MPI_Wtime(); - t_wait = t_end - t_start; - sf_write_wait_time += t_wait; - t_start = t_end; + t_end = MPI_Wtime(); + t_wait = t_end - t_start; + sf_write_wait_time += t_wait; + t_start = t_end; #ifndef NDEBUG - if (sf_verbose_flag) { - if (sf_logfile) { - fprintf(sf_logfile, - "[ioc(%d) %s] MPI_Recv(%ld bytes, from = %d) status = %d\n", - subfile_rank, __func__, data_size, source, ret); + if (sf_verbose_flag) { + if (sf_logfile) { + fprintf(sf_logfile, "[ioc(%d) %s] MPI_Recv(%ld bytes, from = %d) status = %d\n", subfile_rank, + __func__, data_size, source, ret); + } } - } #endif - if (ret != MPI_SUCCESS) { - int len; - char estring[MPI_MAX_ERROR_STRING]; - MPI_Error_string(ret, estring, &len); - printf("[ioc(%d) %s] MPI_ERROR(%d)! MPI_Recv of %ld bytes from %d " - "returned an error(%s)\n", - subfile_rank, __func__, msg_status.MPI_ERROR, data_size, source, - estring); - fflush(stdout); - return ret; - } + if (ret != MPI_SUCCESS) { + int len; + char estring[MPI_MAX_ERROR_STRING]; + MPI_Error_string(ret, estring, &len); + printf("[ioc(%d) %s] MPI_ERROR(%d)! MPI_Recv of %ld bytes from %d " + "returned an error(%s)\n", + subfile_rank, __func__, msg_status.MPI_ERROR, data_size, source, estring); + fflush(stdout); + return ret; + } + + fd = sf_context->sf_fid; + if (fd < 0) { + printf("[ioc(%d)] WARNING: %s called while subfile_fid = %d (closed)\n", subfile_rank, __func__, fd); + fflush(stdout); + } + else { + if (sf_write_data(fd, file_offset, recv_buffer, data_size, subfile_rank) < 0) { + free(recv_buffer); + recv_buffer = NULL; + printf("[ioc(%d) %s] sf_write_data returned an error!\n", subfile_rank, __func__); + fflush(stdout); + return -1; + } + t_end = MPI_Wtime(); + t_write = t_end - t_start; + sf_pwrite_time += t_write; + } + + sf_queue_delay_time += t_queue_delay; + + /* Done... */ + msg->completed = 1; + if (sf_eof > sf_context->sf_eof) + sf_context->sf_eof = sf_eof; - fd = sf_context->sf_fid; - if (fd < 0) { - printf("[ioc(%d)] WARNING: %s called while subfile_fid = %d (closed)\n", - subfile_rank, __func__, fd); +#ifdef VERBOSE + printf("[ioc(%d)] %s local sf_eof = %ld sf_context=%p\n", subfile_rank, __func__, sf_context->sf_eof, + (void *)sf_context); fflush(stdout); - } else { - if (sf_write_data(fd, file_offset, recv_buffer, data_size, subfile_rank) < - 0) { - free(recv_buffer); - recv_buffer = NULL; - printf("[ioc(%d) %s] sf_write_data returned an error!\n", subfile_rank, - __func__); - fflush(stdout); - return -1; +#endif + if (recv_buffer) { + free(recv_buffer); } - t_end = MPI_Wtime(); - t_write = t_end - t_start; - sf_pwrite_time += t_write; - } - - sf_queue_delay_time += t_queue_delay; - - /* Done... */ - msg->completed = 1; - if (recv_buffer) { - free(recv_buffer); - } - return 0; + return 0; } /*------------------------------------------------------------------------- @@ -1639,146 +1654,144 @@ int queue_write_indep(sf_work_request_t *msg, int subfile_rank, int source, * *------------------------------------------------------------------------- */ -int queue_read_indep(sf_work_request_t *msg, int subfile_rank, int source, - MPI_Comm comm) { - int fd; - char *send_buffer = NULL; - int ret = MPI_SUCCESS; - int64_t data_size = msg->header[0]; - int64_t file_offset = msg->header[1]; - int64_t file_context_id = msg->header[2]; - double t_start, t_end; - double t_read, t_queue_delay; - - subfiling_context_t *sf_context = get__subfiling_object(file_context_id); - assert(sf_context != NULL); - - sf_context->sf_read_count++; - /* For debugging performance */ - sf_read_ops++; - - t_start = MPI_Wtime(); - t_queue_delay = t_start - msg->start_time; - - fd = sf_context->sf_fid; - if (fd < 0) { - printf("[ioc(%d) %s] subfile(%d) file descriptor not valid\n", subfile_rank, - __func__, fd); - return -1; - } +int +queue_read_indep(sf_work_request_t *msg, int subfile_rank, int source, MPI_Comm comm) +{ + int fd; + char * send_buffer = NULL; + int ret = MPI_SUCCESS; + int64_t data_size = msg->header[0]; + int64_t file_offset = msg->header[1]; + int64_t file_context_id = msg->header[2]; + double t_start, t_end; + double t_read, t_queue_delay; + + subfiling_context_t *sf_context = get__subfiling_object(file_context_id); + assert(sf_context != NULL); + + sf_context->sf_read_count++; + /* For debugging performance */ + sf_read_ops++; + + t_start = MPI_Wtime(); + t_queue_delay = t_start - msg->start_time; + + fd = sf_context->sf_fid; + if (fd < 0) { + printf("[ioc(%d) %s] subfile(%d) file descriptor not valid\n", subfile_rank, __func__, fd); + return -1; + } #ifndef NDEBUG - if (sf_verbose_flag && (sf_logfile != NULL)) { - fprintf(sf_logfile, - "[ioc(%d) %s] msg from %d: datasize=%ld\toffset=%ld " - "queue_delay=%lf seconds\n", - subfile_rank, __func__, source, data_size, file_offset, - t_queue_delay); - } + if (sf_verbose_flag && (sf_logfile != NULL)) { + fprintf(sf_logfile, + "[ioc(%d) %s] msg from %d: datasize=%ld\toffset=%ld " + "queue_delay=%lf seconds\n", + subfile_rank, __func__, source, data_size, file_offset, t_queue_delay); + } #endif - if ((send_buffer = (char *)malloc((size_t)data_size)) == NULL) { - perror("malloc"); - return -1; - } - - if (sf_read_data(fd, file_offset, send_buffer, data_size, subfile_rank) < 0) { - printf("[%d] %s - sf_read_data fd=%d for source(%d) returned an error!\n", - subfile_rank, __func__, fd, source ); - fflush(stdout); - /* - * Should send a zero(0) byte message to the client to prevent - * it from hanging... - */ - MPI_Send(send_buffer, 0, MPI_BYTE, source, READ_INDEP_DATA, comm); - free(send_buffer); - return -1; - } - - ret = MPI_Send(send_buffer, (int)data_size, MPI_BYTE, source, READ_INDEP_DATA, - comm); - if (ret != MPI_SUCCESS) { - int len; - char estring[MPI_MAX_ERROR_STRING]; - MPI_Error_string(ret, estring, &len); - printf("[ioc(%d)] ERROR! MPI_Send of %ld bytes to %d returned an " - "error(%s)\n", - subfile_rank, data_size, source, estring); - fflush(stdout); - return ret; - } - t_end = MPI_Wtime(); - t_read = t_end - t_start; - sf_pread_time += t_read; - sf_queue_delay_time += t_queue_delay; + if ((send_buffer = (char *)malloc((size_t)data_size)) == NULL) { + perror("malloc"); + return -1; + } + + if (sf_read_data(fd, file_offset, send_buffer, data_size, subfile_rank) < 0) { + printf("[%d] %s - sf_read_data fd=%d for source(%d) returned an error!\n", subfile_rank, __func__, fd, + source); + fflush(stdout); + /* + * Should send a zero(0) byte message to the client to prevent + * it from hanging... + */ + MPI_Send(send_buffer, 0, MPI_BYTE, source, READ_INDEP_DATA, comm); + free(send_buffer); + return -1; + } + + ret = MPI_Send(send_buffer, (int)data_size, MPI_BYTE, source, READ_INDEP_DATA, comm); + if (ret != MPI_SUCCESS) { + int len; + char estring[MPI_MAX_ERROR_STRING]; + MPI_Error_string(ret, estring, &len); + printf("[ioc(%d)] ERROR! MPI_Send of %ld bytes to %d returned an " + "error(%s)\n", + subfile_rank, data_size, source, estring); + fflush(stdout); + return ret; + } + t_end = MPI_Wtime(); + t_read = t_end - t_start; + sf_pread_time += t_read; + sf_queue_delay_time += t_queue_delay; #ifndef NDEBUG - if (sf_verbose_flag && (sf_logfile != NULL)) { - fprintf(sf_logfile, "[ioc(%d)] MPI_Send to source(%d) completed\n", - subfile_rank, source); - } + if (sf_verbose_flag && (sf_logfile != NULL)) { + fprintf(sf_logfile, "[ioc(%d)] MPI_Send to source(%d) completed\n", subfile_rank, source); + } #endif - if (send_buffer) { - free(send_buffer); - send_buffer = NULL; - } + if (send_buffer) { + free(send_buffer); + send_buffer = NULL; + } - return 0; + return 0; } /* end queue_read_indep() */ /* --------------------------------------------------- * Helper function for subfiling_open_file() see below * ---------------------------------------------------*/ -static -void get__subfile_name(subfiling_context_t *sf_context, int64_t h5_file_id, int subfile_rank, - char **_prefix, char **_subfile_dir, char *filepath) +static void +get__subfile_name(subfiling_context_t *sf_context, int64_t h5_file_id, int subfile_rank, char **_prefix, + char **_subfile_dir, char *filepath) { - const char slash = '/'; - char workdir[PATH_MAX]; - char configdir[PATH_MAX]; - - char *prefix = NULL, *subfile_dir = NULL; - int n_io_concentrators = sf_context->topology->n_io_concentrators; - - memset(workdir, 0, PATH_MAX); - getcwd(workdir,PATH_MAX); - - if ((prefix = sf_context->subfile_prefix) == NULL) { - memset(configdir, 0, PATH_MAX); - strncpy(configdir, sf_context->filename, strlen(sf_context->filename)); - prefix = dirname(configdir); - } - - size_t prefix_len = strlen(prefix); - if (strcmp(prefix, workdir)) { - if (prefix[prefix_len-1] == slash) { - if (sf_context->subfile_prefix) - sprintf(filepath, "%s" SF_NODE_LOCAL_TEMPLATE, prefix, h5_file_id, - subfile_rank, n_io_concentrators); - else - sprintf(filepath, "%s" SF_FILENAME_TEMPLATE, prefix, h5_file_id, - subfile_rank, n_io_concentrators); - } - else { - if (sf_context->subfile_prefix) - sprintf(filepath, "%s/" SF_NODE_LOCAL_TEMPLATE, prefix, h5_file_id, - subfile_rank, n_io_concentrators); - else - sprintf(filepath, "%s/" SF_FILENAME_TEMPLATE, prefix, h5_file_id, - subfile_rank, n_io_concentrators); + const char slash = '/'; + char workdir[PATH_MAX]; + char configdir[PATH_MAX]; + + char *prefix = NULL, *subfile_dir = NULL; + int n_io_concentrators = sf_context->topology->n_io_concentrators; + + memset(workdir, 0, PATH_MAX); + getcwd(workdir, PATH_MAX); + + if ((prefix = sf_context->subfile_prefix) == NULL) { + memset(configdir, 0, PATH_MAX); + strncpy(configdir, sf_context->h5_filename, strlen(sf_context->h5_filename)); + prefix = dirname(configdir); + } + + size_t prefix_len = strlen(prefix); + if (strcmp(prefix, workdir)) { + if (prefix[prefix_len - 1] == slash) { + if (sf_context->subfile_prefix) + sprintf(filepath, "%s" SF_NODE_LOCAL_TEMPLATE, prefix, h5_file_id, subfile_rank, + n_io_concentrators); + else + sprintf(filepath, "%s" SF_FILENAME_TEMPLATE, prefix, h5_file_id, subfile_rank, + n_io_concentrators); + } + else { + if (sf_context->subfile_prefix) + sprintf(filepath, "%s/" SF_NODE_LOCAL_TEMPLATE, prefix, h5_file_id, subfile_rank, + n_io_concentrators); + else + sprintf(filepath, "%s/" SF_FILENAME_TEMPLATE, prefix, h5_file_id, subfile_rank, + n_io_concentrators); } - } else { - memset(configdir, 0, PATH_MAX); - strcpy(configdir, sf_context->filename); - subfile_dir = dirname(configdir); - sprintf(filepath, "%s/" SF_FILENAME_TEMPLATE, subfile_dir, h5_file_id, - subfile_rank, n_io_concentrators); - } - - *_prefix = prefix; - *_subfile_dir = subfile_dir; + } + else { + memset(configdir, 0, PATH_MAX); + strcpy(configdir, sf_context->h5_filename); + subfile_dir = dirname(configdir); + sprintf(filepath, "%s/" SF_FILENAME_TEMPLATE, subfile_dir, h5_file_id, subfile_rank, + n_io_concentrators); + } + + *_prefix = prefix; + *_subfile_dir = subfile_dir; } - + /*------------------------------------------------------------------------- * Function: Public/IOC subfiling_open_file * @@ -1805,153 +1818,158 @@ void get__subfile_name(subfiling_context_t *sf_context, int64_t h5_file_id, int * *------------------------------------------------------------------------- */ -int subfiling_open_file(sf_work_request_t *msg, int subfile_rank, int flags) { - int errors = 0; - char filepath[PATH_MAX]; - char *prefix = NULL; - char *subfile_dir = NULL; - mode_t mode = S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH; - - double t_start = 0.0, t_end = 0.0; - /* Only the real IOCs open the subfiles - * Once a file is opened, all additional file open requests - * can return immediately. - */ - - t_start = MPI_Wtime(); - /* Only allow the actual IO concentrator ranks to create sub-files */ - if (subfile_rank >= 0) { - char config[PATH_MAX]; - int64_t h5_file_id = msg->header[1]; - int64_t file_context_id = msg->header[2]; - subfiling_context_t *sf_context = get__subfiling_object(file_context_id); - assert(sf_context != NULL); - - memset(filepath, 0, PATH_MAX); - - begin_thread_exclusive(); - /* Check to see whether we need to create the subfile - * and possibly (IFF our subfile_rank is 0) a config file. +int +subfiling_open_file(sf_work_request_t *msg, int subfile_rank, int flags) +{ + int errors = 0; + char filepath[PATH_MAX]; + char * prefix = NULL; + char * subfile_dir = NULL; + mode_t mode = S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH; + + double t_start = 0.0, t_end = 0.0; + /* Only the real IOCs open the subfiles + * Once a file is opened, all additional file open requests + * can return immediately. */ - get__subfile_name(sf_context, h5_file_id, subfile_rank, &prefix, &subfile_dir, filepath); - - if (sf_context->sf_fid == -2) { - const char *dotconfig = ".subfile_config"; - int n_io_concentrators = sf_context->topology->n_io_concentrators; - int *io_concentrator = sf_context->topology->io_concentrator; - hid_t fapl = H5Pcreate(H5P_FILE_ACCESS); - void *fptr = H5FD_open(filepath, H5F_ACC_CREAT|H5F_ACC_RDWR, fapl, HADDR_UNDEF); - if (fptr) { - H5FD_close(fptr); - } - if ((sf_context->sf_fid = HDopen(filepath, flags|O_CREAT , mode)) < 0) { - end_thread_exclusive(); - HDprintf("[%d %s] file create(%s) failed!\n", subfile_rank, __func__, - filepath); - HDfflush(stdout); - -#ifndef NDEBUG - if (sf_verbose_flag) { - printf("[%d %s] file open(%s) failed!\n", subfile_rank, __func__, - filepath); - fflush(stdout); - } -#endif - errors++; - goto done; - } - - memset(filepath, 0, sizeof(filepath)); - strcpy(filepath, sf_context->filename); - subfile_dir = strrchr(filepath, '/'); - if (subfile_dir) { - size_t remaining = strlen(subfile_dir); - memset(subfile_dir, 0, remaining); - sprintf(subfile_dir, "/%ld%s", h5_file_id, dotconfig); - strcpy(config, filepath); - } - - if ((subfile_rank == 0) && (flags & O_CREAT)) { - FILE *f = NULL; - /* If a config file already exists, AND - * the user wants to truncate subfiles (if they exist), - * then we should also truncate an existing config file. + t_start = MPI_Wtime(); + /* Only allow the actual IO concentrator ranks to create sub-files */ + if (subfile_rank >= 0) { + int k, retries = 2; + char config[PATH_MAX]; + int64_t h5_file_id = msg->header[1]; + int64_t file_context_id = msg->header[2]; + subfiling_context_t *sf_context = get__subfiling_object(file_context_id); + assert(sf_context != NULL); + + memset(filepath, 0, PATH_MAX); + + begin_thread_exclusive(); + /* Check to see whether we need to create the subfile + * and possibly (IFF our subfile_rank is 0) a config file. */ - if (access(config, flags) == 0) { - truncate(config, 0); - } - f = HDfopen(config, "w+"); - if (f != NULL) { - int k; - char linebuf[PATH_MAX]; - sprintf(linebuf, "stripe_size=%ld\n", sf_context->sf_stripe_size); - HDfwrite(linebuf, strlen(linebuf), 1, f); - sprintf(linebuf, "aggregator_count=%d\n", n_io_concentrators); - HDfwrite(linebuf, strlen(linebuf), 1, f); - sprintf(linebuf, "hdf5_file=%s\n", sf_context->filename); - HDfwrite(linebuf, strlen(linebuf), 1, f); - - for (k = 0; k < n_io_concentrators; k++) { - if (prefix) - sprintf(linebuf, "%s/%ld_node_local_temp_%d_of_%d:%d\n", prefix, - h5_file_id, subfile_rank, n_io_concentrators, - io_concentrator[k]); - else - sprintf(linebuf, "%ld_node_local_temp_%d_of_%d:%d\n", h5_file_id, - subfile_rank, n_io_concentrators, io_concentrator[k]); - HDfwrite(linebuf, strlen(linebuf), 1, f); - } + get__subfile_name(sf_context, h5_file_id, subfile_rank, &prefix, &subfile_dir, filepath); + sf_context->sf_filename = strdup(filepath); + + assert(sf_context->sf_filename); + + /* Check if we need to create the subfiles */ + if (sf_context->sf_fid == -2) { + const char *dotconfig = "-subfile_config"; + int n_io_concentrators = sf_context->topology->n_io_concentrators; + int * io_concentrator = sf_context->topology->io_concentrator; + for (k = 0; k < retries; k++) { + int fd; + if ((fd = HDopen(filepath, O_CREAT | O_RDWR | O_TRUNC, mode)) > 0) { + sf_context->sf_fid = fd; + break; + } + } + if (sf_context->sf_fid < 0) { + end_thread_exclusive(); + perror("subfiling_open_file/open"); + HDprintf("[%d %s] file create(%s) failed!\n", subfile_rank, __func__, filepath); + HDfflush(stdout); - fclose(f); - } else { - perror("fopen(config)"); - errors++; - goto done; - } - } #ifndef NDEBUG - if (sf_verbose_flag) { - if (sf_logfile) { - HDfprintf(sf_logfile, "[ioc:%d] Opened subfile %s\n", subfile_rank, - filepath); - } - } + if (sf_verbose_flag) { + printf("[%d %s] file create(%s) failed!\n", subfile_rank, __func__, filepath); + fflush(stdout); + } #endif - } - else { - if ((sf_context->sf_fid = HDopen(filepath, flags, mode)) < 0) { - end_thread_exclusive(); - HDprintf("[%d %s] file open(%s) failed!\n", subfile_rank, __func__, - filepath); - HDfflush(stdout); - + errors++; + goto done; + } + + memset(config, 0, sizeof(filepath)); + sprintf(config, "%s%s", sf_context->h5_filename, dotconfig); + + if ((subfile_rank == 0) && (flags & O_CREAT)) { + FILE *f = NULL; + /* If a config file already exists, AND + * the user wants to truncate subfiles (if they exist), + * then we should also truncate an existing config file. + */ + if (access(config, flags) == 0) { + truncate(config, 0); + } + f = HDfopen(config, "w+"); + if (f != NULL) { + char linebuf[PATH_MAX]; + sprintf(linebuf, "stripe_size=%ld\n", sf_context->sf_stripe_size); + HDfwrite(linebuf, strlen(linebuf), 1, f); + sprintf(linebuf, "aggregator_count=%d\n", n_io_concentrators); + HDfwrite(linebuf, strlen(linebuf), 1, f); + sprintf(linebuf, "hdf5_file=%s\n", sf_context->h5_filename); + HDfwrite(linebuf, strlen(linebuf), 1, f); + + for (k = 0; k < n_io_concentrators; k++) { + if (prefix) + sprintf(linebuf, "%s/%ld_node_local_temp_%d_of_%d:%d\n", prefix, h5_file_id, + subfile_rank, n_io_concentrators, io_concentrator[k]); + else + sprintf(linebuf, "%ld_subfile_%d_of_%d:%d\n", h5_file_id, subfile_rank, + n_io_concentrators, io_concentrator[k]); + + HDfwrite(linebuf, strlen(linebuf), 1, f); + } + + fclose(f); + } + else { + perror("fopen(config)"); + errors++; + goto done; + } + } #ifndef NDEBUG - if (sf_verbose_flag) { - HDprintf("[%d %s] file open(%s) failed!\n", subfile_rank, __func__, - filepath); - HDfflush(stdout); + if (sf_verbose_flag) { + if (sf_logfile) { + HDfprintf(sf_logfile, "[ioc:%d] Opened subfile %s\n", subfile_rank, filepath); + } + } +#endif } + else { + for (k = 0; k < retries; k++) { + int fd; + if ((fd = HDopen(filepath, O_CREAT | O_RDWR, mode)) > 0) { + sf_context->sf_fid = fd; + break; + } + } + if (sf_context->sf_fid < 0) { + end_thread_exclusive(); + perror("subfiling_open_file/open"); + HDprintf("[%d %s] file open(%s) failed!\n", subfile_rank, __func__, filepath); + HDfflush(stdout); + +#ifndef NDEBUG + if (sf_verbose_flag) { + HDprintf("[%d %s] file open(%s) failed!\n", subfile_rank, __func__, filepath); + HDfflush(stdout); + } #endif - errors++; - goto done; - } - } - end_thread_exclusive(); - } + errors++; + goto done; + } + } + end_thread_exclusive(); + } done: - t_end = MPI_Wtime(); + t_end = MPI_Wtime(); #ifndef NDEBUG - if (sf_verbose_flag) { - printf("[%s %d] open completed in %lf seconds with %d errors\n", __func__, - subfile_rank, (t_end - t_start), errors); - fflush(stdout); - } + if (sf_verbose_flag) { + printf("[%s %d] open completed in %lf seconds with %d errors\n", __func__, subfile_rank, + (t_end - t_start), errors); + fflush(stdout); + } #endif - return errors; + return errors; } /* end subfiling_open_file() */ /*------------------------------------------------------------------------- @@ -1983,57 +2001,65 @@ done: *------------------------------------------------------------------------- */ -int sf_get_mpi_rank(hid_t fid, int *rank) { - hid_t context_id = fid_map_to_context((uint64_t)fid); - subfiling_context_t *sf_context = get__subfiling_object(context_id); - assert(sf_context != NULL); - assert(rank != NULL); - *rank = sf_context->sf_group_rank; - return 0; +int +sf_get_mpi_rank(hid_t fid, int *rank) +{ + hid_t context_id = fid_map_to_context((uint64_t)fid); + subfiling_context_t *sf_context = get__subfiling_object(context_id); + assert(sf_context != NULL); + assert(rank != NULL); + *rank = sf_context->sf_group_rank; + return 0; } -int sf_get_mpi_size(hid_t fid, int *size) { - hid_t context_id = fid_map_to_context((uint64_t)fid); - subfiling_context_t *sf_context = get__subfiling_object(context_id); - assert(sf_context != NULL); - assert(size != NULL); - *size = sf_context->sf_group_size; - return 0; +int +sf_get_mpi_size(hid_t fid, int *size) +{ + hid_t context_id = fid_map_to_context((uint64_t)fid); + subfiling_context_t *sf_context = get__subfiling_object(context_id); + assert(sf_context != NULL); + assert(size != NULL); + *size = sf_context->sf_group_size; + return 0; } -int sf_get_group_comm(hid_t fid, MPI_Comm *comm) { - hid_t context_id = fid_map_to_context((uint64_t)fid); - subfiling_context_t *sf_context = get__subfiling_object(context_id); - assert(sf_context != NULL); - assert(comm != NULL); - *comm = sf_context->sf_group_comm; - return 0; +int +sf_get_group_comm(hid_t fid, MPI_Comm *comm) +{ + hid_t context_id = fid_map_to_context((uint64_t)fid); + subfiling_context_t *sf_context = get__subfiling_object(context_id); + assert(sf_context != NULL); + assert(comm != NULL); + *comm = sf_context->sf_group_comm; + return 0; } -int sf_subfile_set_logging(hid_t sf_fid, int ioc_rank, int flag) { - int ioc; - int status = 0; - hid_t context_id = fid_map_to_context((uint64_t)sf_fid); - subfiling_context_t *sf_context = get__subfiling_object(context_id); - int n_io_concentrators; - int *io_concentrator = NULL; - int64_t lflag = (int64_t)(flag & 0xFF); - int64_t msg[3]; - - assert(sf_context != NULL); - - msg[0] = lflag; - msg[1] = 0; - msg[2] = sf_context->sf_context_id; - - n_io_concentrators = sf_context->topology->n_io_concentrators; - io_concentrator = sf_context->topology->io_concentrator; - - for (ioc = 0; ioc < n_io_concentrators; ioc++) { - if ((flag < 0) || (flag == ioc_rank)) { - status = MPI_Ssend(msg, 3, MPI_INT64_T, io_concentrator[ioc], LOGGING_OP, - sf_context->sf_msg_comm); +int +sf_subfile_set_logging(hid_t sf_fid, int ioc_rank, int flag) +{ + int ioc; + int status = 0; + hid_t context_id = fid_map_to_context((uint64_t)sf_fid); + subfiling_context_t *sf_context = get__subfiling_object(context_id); + int n_io_concentrators; + int * io_concentrator = NULL; + int64_t lflag = (int64_t)(flag & 0xFF); + int64_t msg[3]; + + assert(sf_context != NULL); + + msg[0] = lflag; + msg[1] = 0; + msg[2] = sf_context->sf_context_id; + + n_io_concentrators = sf_context->topology->n_io_concentrators; + io_concentrator = sf_context->topology->io_concentrator; + + for (ioc = 0; ioc < n_io_concentrators; ioc++) { + if ((flag < 0) || (flag == ioc_rank)) { + status = + MPI_Ssend(msg, 3, MPI_INT64_T, io_concentrator[ioc], LOGGING_OP, sf_context->sf_msg_comm); + } } - } - return status; + return status; } diff --git a/src/H5FDsubfiling.c b/src/H5FDsubfiling.c index e36be8f..27ed44a 100644 --- a/src/H5FDsubfiling.c +++ b/src/H5FDsubfiling.c @@ -46,9 +46,9 @@ FILE *client_log = NULL; #endif /* These are used for the creation of read or write vectors */ -static haddr_t *sf_offsets = NULL; -static hssize_t *sf_sizes = NULL; -static void ** sf_bufs = NULL; +static haddr_t * sf_offsets = NULL; +static hssize_t *sf_sizes = NULL; +static void ** sf_bufs = NULL; /* The description of a file belonging to this driver. The 'eoa' and 'eof' * determine the amount of hdf5 address space in use and the high-water mark @@ -182,17 +182,16 @@ typedef struct H5FD_subfiling_t { #define MAXADDR (((haddr_t)1 << (8 * sizeof(HDoff_t) - 1)) - 1) #define ADDR_OVERFLOW(A) (HADDR_UNDEF == (A) || ((A) & ~(haddr_t)MAXADDR)) #define SIZE_OVERFLOW(Z) ((Z) & ~(hsize_t)MAXADDR) -#define REGION_OVERFLOW(A, Z) \ - (ADDR_OVERFLOW(A) || SIZE_OVERFLOW(Z) || HADDR_UNDEF == (A) + (Z) || \ - (HDoff_t)((A) + (Z)) < (HDoff_t)(A)) +#define REGION_OVERFLOW(A, Z) \ + (ADDR_OVERFLOW(A) || SIZE_OVERFLOW(Z) || HADDR_UNDEF == (A) + (Z) || (HDoff_t)((A) + (Z)) < (HDoff_t)(A)) #define H5FD_IOC_DEBUG_OP_CALLS 0 /* debugging print toggle; 0 disables */ #if H5FD_SUBFILING_DEBUG_OP_CALLS -#define H5FD_SUBFILING_LOG_CALL(name) \ - do { \ - HDprintf("called %s()\n", (name)); \ - HDfflush(stdout); \ +#define H5FD_SUBFILING_LOG_CALL(name) \ + do { \ + HDprintf("called %s()\n", (name)); \ + HDfflush(stdout); \ } while (0) #else #define H5FD_SUBFILING_LOG_CALL(name) /* no-op */ @@ -204,46 +203,33 @@ static herr_t H5FD__subfiling_term(void); static void * H5FD__subfiling_fapl_get(H5FD_t *_file); static void * H5FD__subfiling_fapl_copy(const void *_old_fa); static herr_t H5FD__subfiling_fapl_free(void *_fa); -static H5FD_t *H5FD__subfiling_open(const char *name, unsigned flags, - hid_t fapl_id, haddr_t maxaddr); +static H5FD_t *H5FD__subfiling_open(const char *name, unsigned flags, hid_t fapl_id, haddr_t maxaddr); static herr_t H5FD__subfiling_close(H5FD_t *_file); static int H5FD__subfiling_cmp(const H5FD_t *_f1, const H5FD_t *_f2); static herr_t H5FD__subfiling_query(const H5FD_t *_f1, unsigned long *flags); static haddr_t H5FD__subfiling_get_eoa(const H5FD_t *_file, H5FD_mem_t type); -static herr_t H5FD__subfiling_set_eoa(H5FD_t *_file, H5FD_mem_t type, - haddr_t addr); +static herr_t H5FD__subfiling_set_eoa(H5FD_t *_file, H5FD_mem_t type, haddr_t addr); static haddr_t H5FD__subfiling_get_eof(const H5FD_t *_file, H5FD_mem_t type); -static herr_t H5FD__subfiling_get_handle(H5FD_t *_file, hid_t fapl, - void **file_handle); -static herr_t H5FD__subfiling_read(H5FD_t *_file, H5FD_mem_t type, - hid_t fapl_id, haddr_t addr, size_t size, +static herr_t H5FD__subfiling_get_handle(H5FD_t *_file, hid_t fapl, void **file_handle); +static herr_t H5FD__subfiling_read(H5FD_t *_file, H5FD_mem_t type, hid_t fapl_id, haddr_t addr, size_t size, void *buf); -static herr_t H5FD__subfiling_write(H5FD_t *_file, H5FD_mem_t type, - hid_t dxpl_id, haddr_t addr, size_t size, +static herr_t H5FD__subfiling_write(H5FD_t *_file, H5FD_mem_t type, hid_t dxpl_id, haddr_t addr, size_t size, const void *buf); -static herr_t H5FD__subfiling_read_vector(H5FD_t *file, hid_t dxpl_id, - uint32_t count, H5FD_mem_t types[], - haddr_t addrs[], size_t sizes[], - void *bufs[] /* out */); -static herr_t H5FD__subfiling_write_vector(H5FD_t *file, hid_t dxpl_id, - uint32_t count, H5FD_mem_t types[], - haddr_t addrs[], size_t sizes[], - const void *bufs[] /* in */); +static herr_t H5FD__subfiling_read_vector(H5FD_t *file, hid_t dxpl_id, uint32_t count, H5FD_mem_t types[], + haddr_t addrs[], size_t sizes[], void *bufs[] /* out */); +static herr_t H5FD__subfiling_write_vector(H5FD_t *file, hid_t dxpl_id, uint32_t count, H5FD_mem_t types[], + haddr_t addrs[], size_t sizes[], const void *bufs[] /* in */); -static herr_t H5FD__subfiling_truncate(H5FD_t *_file, hid_t dxpl_id, - hbool_t closing); +static herr_t H5FD__subfiling_truncate(H5FD_t *_file, hid_t dxpl_id, hbool_t closing); static herr_t H5FD__subfiling_lock(H5FD_t *_file, hbool_t rw); static herr_t H5FD__subfiling_unlock(H5FD_t *_file); -static herr_t H5FD__subfiling_ctl(H5FD_t *_file, uint64_t op_code, - uint64_t flags, - const void H5_ATTR_UNUSED *input, - void **output); +static herr_t H5FD__subfiling_ctl(H5FD_t *_file, uint64_t op_code, uint64_t flags, + const void H5_ATTR_UNUSED *input, void **output); static herr_t H5FD__subfiling_validate_config(const H5FD_subfiling_config_t *fa); - #if 0 /* JRM */ /* delete if all goes well */ static int H5FD__subfiling_mpi_rank(const H5FD_t *_file); static int H5FD__subfiling_mpi_size(const H5FD_t *_file); @@ -287,8 +273,8 @@ static const H5FD_class_t H5FD_subfiling_g = { NULL, /* write_selection */ NULL, /* flush */ H5FD__subfiling_truncate, /* truncate */ - NULL, /* lock */ - NULL, /* unlock */ + H5FD__subfiling_lock, /* lock */ + H5FD__subfiling_unlock, /* unlock */ H5FD__subfiling_ctl, /* ctl */ H5FD_FLMAP_DICHOTOMY /* fl_map */ }; @@ -477,11 +463,11 @@ fapl__get_subfiling_defaults(H5FD_subfiling_config_t *fa) herr_t H5Pset_fapl_subfiling(hid_t fapl_id, H5FD_subfiling_config_t *fa) { - H5P_genplist_t * plist = NULL; /* Property list pointer */ - hid_t ioc_fapl = H5I_INVALID_HID; - H5FD_ioc_config_t ioc_config; + H5P_genplist_t * plist = NULL; /* Property list pointer */ + hid_t ioc_fapl = H5I_INVALID_HID; + H5FD_ioc_config_t ioc_config; H5FD_subfiling_config_t subfiling_conf; - herr_t ret_value = FAIL; + herr_t ret_value = FAIL; FUNC_ENTER_API(FAIL) H5TRACE2("e", "i*!", fapl_id, fa); @@ -914,12 +900,20 @@ H5FD__subfiling_close(H5FD_t *_file) { H5FD_subfiling_t *file_ptr = (H5FD_subfiling_t *)_file; herr_t ret_value = SUCCEED; /* Return value */ + // subfiling_context_t *sf_context = NULL; FUNC_ENTER_NOAPI_NOINIT /* Sanity check */ HDassert(file_ptr); - +#ifdef VERBOSE + sf_context = (subfiling_context_t *)get__subfiling_object(file_ptr->fa.common.context_id); + if (sf_context->topology->rank_is_ioc) + printf("[%s %d] fd=%d\n", __func__, file_ptr->mpi_rank, sf_context->sf_fid); + else + printf("[%s %d] fd=*\n", __func__, file_ptr->mpi_rank); + fflush(stdout); +#endif if (H5FD_close(file_ptr->sf_file) != SUCCEED) { HSYS_GOTO_ERROR(H5E_IO, H5E_CANTCLOSEFILE, FAIL, "unable to close file") } @@ -1119,11 +1113,19 @@ H5FD__subfiling_set_eoa(H5FD_t *_file, H5FD_mem_t H5_ATTR_UNUSED type, haddr_t a static haddr_t H5FD__subfiling_get_eof(const H5FD_t *_file, H5FD_mem_t H5_ATTR_UNUSED type) { - const H5FD_subfiling_t *file = (const H5FD_subfiling_t *)_file; + H5FD_subfiling_t *file = (const H5FD_subfiling_t *)_file; + haddr_t ret_value = HADDR_UNDEF; + haddr_t local_eof, global_eof = 0; + FUNC_ENTER_STATIC - FUNC_ENTER_NOAPI_NOINIT_NOERR + local_eof = H5FD_get_eof(file->sf_file, type); + if (MPI_SUCCESS != MPI_Allreduce(&local_eof, &global_eof, 1, MPI_LONG_LONG, MPI_MAX, MPI_COMM_WORLD)) + HGOTO_ERROR(H5E_INTERNAL, H5E_CANTGET, HADDR_UNDEF, "mpi_allreduce failed") + /* Return the global max of all the subfile EOF values */ - FUNC_LEAVE_NOAPI(file->eof) + ret_value = global_eof; +done: + FUNC_LEAVE_NOAPI(ret_value) } /* end H5FD_subfiling_get_eof() */ /*------------------------------------------------------------------------- @@ -1200,6 +1202,11 @@ H5FD__subfiling_read(H5FD_t *_file, H5FD_mem_t H5_ATTR_UNUSED type, hid_t H5_ATT * used to invoke the underlying IO operations. */ ioc_total = sf_context->topology->n_io_concentrators; +#ifdef VERBOSE + printf("[%s %d] fd=%d\n", __func__, file_ptr->mpi_rank, sf_context->sf_fid); + fflush(stdout); +#endif + if (ioc_total > 1) { blocksize = sf_context->sf_blocksize_per_stripe; #if 0 /* JRM */ @@ -1418,7 +1425,6 @@ H5FD__subfiling_write(H5FD_t *_file, H5FD_mem_t H5_ATTR_UNUSED type, hid_t H5_AT HDassert(buf); sf_context = (subfiling_context_t *)get__subfiling_object(file_ptr->fa.common.context_id); - HDassert(sf_context); HDassert(sf_context->topology); @@ -1431,6 +1437,15 @@ H5FD__subfiling_write(H5FD_t *_file, H5FD_mem_t H5_ATTR_UNUSED type, hid_t H5_AT * used to invoke the underlying IO operations. */ ioc_total = sf_context->topology->n_io_concentrators; + +#ifdef VERBOSE + if (sf_context->topology->rank_is_ioc) + printf("[%s %d] fd=%d\n", __func__, file_ptr->mpi_rank, sf_context->sf_fid); + else + printf("[%s %d] fd=*\n", __func__, file_ptr->mpi_rank); + fflush(stdout); +#endif + if (ioc_total > 1) { blocksize = sf_context->sf_blocksize_per_stripe; #if 0 /* JRM */ @@ -1458,6 +1473,11 @@ H5FD__subfiling_write(H5FD_t *_file, H5FD_mem_t H5_ATTR_UNUSED type, hid_t H5_AT addr += _file->base_addr; +#ifdef VERBOSE + printf("[%s %d] addr=%ld, size=%ld\n", __func__, file_ptr->mpi_rank, addr, size); + fflush(stdout); +#endif + /* Follow the example of read_vector (see H5FDint.c) */ addrs_cooked = TRUE; @@ -1817,8 +1837,8 @@ done: static herr_t H5FD__subfiling_truncate(H5FD_t *_file, hid_t H5_ATTR_UNUSED dxpl_id, hbool_t H5_ATTR_UNUSED closing) { - H5FD_subfiling_t *file = (H5FD_subfiling_t *)_file; - herr_t ret_value = SUCCEED; /* Return value */ + H5FD_subfiling_t *file = (H5FD_subfiling_t *)_file; + herr_t ret_value = SUCCEED; /* Return value */ FUNC_ENTER_NOAPI_NOINIT @@ -1855,8 +1875,8 @@ H5FD__subfiling_truncate(H5FD_t *_file, hid_t H5_ATTR_UNUSED dxpl_id, hbool_t H5 static herr_t H5FD__subfiling_lock(H5FD_t *_file, hbool_t rw) { - H5FD_subfiling_t *file_ptr = (H5FD_subfiling_t *)_file; /* VFD file struct */ - herr_t ret_value = SUCCEED; /* Return value */ + H5FD_subfiling_t *file_ptr = (H5FD_subfiling_t *)_file; /* VFD file struct */ + herr_t ret_value = SUCCEED; /* Return value */ FUNC_ENTER_NOAPI_NOINIT @@ -2023,14 +2043,14 @@ done: } /* end H5FD__subfiling_ctl() */ static herr_t -create__simple_vector(hid_t H5_ATTR_UNUSED file_space_id, void *memDataBuf, haddr_t addrBase, hssize_t elements, - size_t type_extent, hssize_t *vlen, haddr_t **_offsets, hssize_t **_blocklens, - void ***_bufs) +create__simple_vector(hid_t H5_ATTR_UNUSED file_space_id, void *memDataBuf, haddr_t addrBase, + hssize_t elements, size_t type_extent, hssize_t *vlen, haddr_t **_offsets, + hssize_t **_blocklens, void ***_bufs) { - haddr_t *offsets = *_offsets; + haddr_t * offsets = *_offsets; hssize_t *blocklens = *_blocklens; - void ** bufs = *_bufs; - void * nextBuf = memDataBuf; + void ** bufs = *_bufs; + void * nextBuf = memDataBuf; assert(vlen); assert(_offsets); @@ -2232,7 +2252,7 @@ H5FD__dataset_write_contiguous(hid_t h5_file_id, haddr_t dataset_baseAddr, size_ { herr_t ret_value = SUCCEED; /* Return value */ hssize_t num_elem_file = (hssize_t)-1, num_elem_mem = (hssize_t)-1; - hssize_t s_dtype_extent = (hssize_t)dtype_extent; + hssize_t s_dtype_extent = (hssize_t)dtype_extent; H5S_sel_type sel_type; hssize_t sf_vlen = -1; diff --git a/src/H5FDsubfiling.h b/src/H5FDsubfiling.h index 3c67b93..93a2b0a 100644 --- a/src/H5FDsubfiling.h +++ b/src/H5FDsubfiling.h @@ -121,9 +121,10 @@ typedef struct config_common_t { int32_t stripe_count; int64_t stripe_depth; ioc_selection_t ioc_selection; - hid_t ioc_fapl_id; + hid_t ioc_fapl_id; char subfile_dir[H5FD_SUBFILING_PATH_MAX +1]; - char subfile_path[H5FD_SUBFILING_PATH_MAX +1]; + char subfile_path[H5FD_SUBFILING_PATH_MAX +1]; + char h5_filename[H5FD_SUBFILING_PATH_MAX +1]; } config_common_t; ****************************************************************************/ @@ -175,8 +176,7 @@ H5_DLL hid_t H5FD_subfiling_init(void); * \since 1.14.0 * */ -H5_DLL herr_t H5Pget_fapl_subfiling(hid_t fapl_id, - H5FD_subfiling_config_t *config_out); +H5_DLL herr_t H5Pget_fapl_subfiling(hid_t fapl_id, H5FD_subfiling_config_t *config_out); /** * \ingroup FAPL * @@ -195,69 +195,58 @@ H5_DLL herr_t H5Pget_fapl_subfiling(hid_t fapl_id, * \since 1.14.0 * */ -H5_DLL herr_t H5Pset_fapl_subfiling(hid_t fapl_id, - H5FD_subfiling_config_t *vfd_config); +H5_DLL herr_t H5Pset_fapl_subfiling(hid_t fapl_id, H5FD_subfiling_config_t *vfd_config); H5_DLL herr_t H5FD__get_file_ino(const char *name, uint64_t *st_ino); -H5_DLL char *H5FD__get_file_directory(void *h5file); -H5_DLL herr_t H5FD__dataset_write_contiguous( - hid_t h5_file_id, haddr_t dataset_baseAddr, size_t dtype_extent, - int mpi_rank, int mpi_size, void *_dset, hid_t mem_type_id, - hid_t mem_space_id, hid_t file_space_id, hid_t plist_id, const void *buf); -H5_DLL herr_t H5FD__dataset_read_contiguous( - hid_t h5_file_id, haddr_t dataset_baseAddr, size_t dtype_extent, - int mpi_rank, int mpi_size, void *_dset, hid_t mem_type_id, - hid_t mem_space_id, hid_t file_space_id, hid_t plist_id, void *buf); +H5_DLL char * H5FD__get_file_directory(void *h5file); +H5_DLL herr_t H5FD__dataset_write_contiguous(hid_t h5_file_id, haddr_t dataset_baseAddr, size_t dtype_extent, + int mpi_rank, int mpi_size, void *_dset, hid_t mem_type_id, + hid_t mem_space_id, hid_t file_space_id, hid_t plist_id, + const void *buf); +H5_DLL herr_t H5FD__dataset_read_contiguous(hid_t h5_file_id, haddr_t dataset_baseAddr, size_t dtype_extent, + int mpi_rank, int mpi_size, void *_dset, hid_t mem_type_id, + hid_t mem_space_id, hid_t file_space_id, hid_t plist_id, + void *buf); H5_DLL char *get_ioc_selection_criteria(ioc_selection_t *); H5_DLL void *get__subfiling_object(int64_t object_id); H5_DLL hid_t fid_map_to_context(uint64_t h5_fid); -/* return arguments are vector of vectors - function return is the length - * (depth) of the sub vectors. Note that we don't need to include the - * MPI_Datatype return argument! +/* return arguments are vector of vectors - function return is the length + * (depth) of the sub vectors. Note that we don't need to include the + * MPI_Datatype return argument! */ H5_DLL int subfiling_open_file(sf_work_request_t *msg, int subfile_rank, int flags); -H5_DLL int init__indep_io(void *_sf_context, size_t depth, int ioc_total, - int64_t *sf_source_data_offset, int64_t *sf_datasize, - int64_t *f_offset, int *first_index, int *n_containers, - int64_t offset, int64_t elements, - int dtype_extent); - -H5_DLL int H5FD__open_subfiles(void *_config_info, uint64_t inode_id, int flags); -H5_DLL int H5FD__close_subfiles(hid_t context_id); -H5_DLL int H5FD__read_independent(hid_t H5FD__fid, int64_t offset, - int64_t elements, int dtype_extent, - void *data); -H5_DLL int H5FD__write_independent(hid_t H5FD__fid, int64_t offset, - int64_t elements, int dtype_extent, - const void *data); -H5_DLL herr_t H5FD__read_vector(hid_t h5_fid, hssize_t count, haddr_t *addrs, - hsize_t sizes[], void *bufs[] /* in */); -H5_DLL herr_t H5FD__write_vector(hid_t h5_fid, hssize_t count, haddr_t *addrs, - hsize_t sizes[], void *bufs[] /* in */); -H5_DLL int H5FD__truncate(hid_t h5_fid, haddr_t addr); -H5_DLL int H5FD__shutdown_local_ioc(hid_t fid); -H5_DLL void manage_client_logfile(int client_rank, int flag_value); -H5_DLL int initialize_ioc_threads(void *sf_context); -H5_DLL herr_t H5FD__write_vector_internal(hid_t h5_fid, hssize_t count, - haddr_t addrs[], size_t sizes[], +H5_DLL int init__indep_io(void *_sf_context, size_t depth, int ioc_total, int64_t *sf_source_data_offset, + int64_t *sf_datasize, int64_t *f_offset, int *first_index, int *n_containers, + int64_t offset, int64_t elements, int dtype_extent); + +H5_DLL int H5FD__open_subfiles(void *_config_info, uint64_t inode_id, int flags); +H5_DLL int H5FD__close_subfiles(hid_t context_id); +H5_DLL int H5FD__read_independent(hid_t H5FD__fid, int64_t offset, int64_t elements, int dtype_extent, + void *data); +H5_DLL int H5FD__write_independent(hid_t H5FD__fid, int64_t offset, int64_t elements, int dtype_extent, + const void *data); +H5_DLL herr_t H5FD__read_vector(hid_t h5_fid, hssize_t count, haddr_t *addrs, hsize_t sizes[], + void *bufs[] /* in */); +H5_DLL herr_t H5FD__write_vector(hid_t h5_fid, hssize_t count, haddr_t *addrs, hsize_t sizes[], + void *bufs[] /* in */); +H5_DLL int H5FD__truncate(hid_t h5_fid, haddr_t addr); +H5_DLL int H5FD__shutdown_local_ioc(hid_t fid); +H5_DLL void manage_client_logfile(int client_rank, int flag_value); +H5_DLL int initialize_ioc_threads(void *sf_context); +H5_DLL herr_t H5FD__write_vector_internal(hid_t h5_fid, hssize_t count, haddr_t addrs[], size_t sizes[], const void *bufs[] /* data_in */); -H5_DLL herr_t H5FD__read_vector_internal(hid_t h5_fid, hssize_t count, - haddr_t addrs[], size_t sizes[], +H5_DLL herr_t H5FD__read_vector_internal(hid_t h5_fid, hssize_t count, haddr_t addrs[], size_t sizes[], void *bufs[] /* data_out */); -H5_DLL int queue_write_indep(sf_work_request_t *msg, int subfile_rank, - int source, MPI_Comm comm); +H5_DLL int queue_write_indep(sf_work_request_t *msg, int subfile_rank, int source, MPI_Comm comm); -H5_DLL int queue_read_indep(sf_work_request_t *msg, int subfile_rank, - int source, MPI_Comm comm); +H5_DLL int queue_read_indep(sf_work_request_t *msg, int subfile_rank, int source, MPI_Comm comm); -H5_DLL int sf_read_data(int fd, int64_t file_offset, void *data_buffer, - int64_t data_size, int subfile_rank); +H5_DLL int sf_read_data(int fd, int64_t file_offset, void *data_buffer, int64_t data_size, int subfile_rank); -H5_DLL int sf_write_data(int fd, int64_t file_offset, void *data_buffer, - int64_t data_size, int subfile_rank); +H5_DLL int sf_write_data(int fd, int64_t file_offset, void *data_buffer, int64_t data_size, int subfile_rank); #ifdef __cplusplus } diff --git a/src/H5FDsubfiling_priv.h b/src/H5FDsubfiling_priv.h index bf39b38..d26bf4c 100644 --- a/src/H5FDsubfiling_priv.h +++ b/src/H5FDsubfiling_priv.h @@ -137,27 +137,26 @@ extern "C" { #ifndef H5FD_SUBFILING_FAPL_T_MAGIC #define H5FD_CURR_SUBFILING_FAPL_T_VERSION 1 -#define H5FD_SUBFILING_FAPL_T_MAGIC 0xFED01331 +#define H5FD_SUBFILING_FAPL_T_MAGIC 0xFED01331 #endif #ifndef H5FD_IOC_FAPL_T_MAGIC #define H5FD_CURR_IOC_FAPL_T_VERSION 1 -#define H5FD_IOC_FAPL_T_MAGIC 0xFED21331 +#define H5FD_IOC_FAPL_T_MAGIC 0xFED21331 #endif -#define DRIVER_INFO_MESSAGE_MAX_INFO 65536 -#define DRIVER_INFO_MESSAGE_MAX_LENGTH \ - 65552 /* MAX_INFO + sizeof(info_header_t) */ +#define DRIVER_INFO_MESSAGE_MAX_INFO 65536 +#define DRIVER_INFO_MESSAGE_MAX_LENGTH 65552 /* MAX_INFO + sizeof(info_header_t) */ -#define K(n) ((n)*1024) -#define M(n) ((n) * (1024 * 1024)) +#define K(n) ((n)*1024) +#define M(n) ((n) * (1024 * 1024)) #define H5FD_DEFAULT_STRIPE_DEPTH M(32) typedef struct stat_record { - int64_t op_count; /* How many ops in total */ - double min; /* minium (time) */ - double max; /* maximum (time) */ - double total; /* average (time) */ + int64_t op_count; /* How many ops in total */ + double min; /* minium (time) */ + double max; /* maximum (time) */ + double total; /* average (time) */ } stat_record_t; typedef enum stat_category { /* Stat (OP) Categories */ @@ -172,12 +171,12 @@ typedef enum stat_category { /* Stat (OP) Categories */ } stat_category_t; typedef struct _info_header { /* Header for a driver info message */ - uint8_t version; - uint8_t unused_1; - uint8_t unused_2; - uint8_t unused_3; /* Actual info message length, but */ - int32_t info_length; /* CANNOT exceed 64k (65552) bytes */ - char vfd_key[8]; /* 's' 'u' 'b' 'f' 'i' 'l' 'i' 'n' */ + uint8_t version; + uint8_t unused_1; + uint8_t unused_2; + uint8_t unused_3; /* Actual info message length, but */ + int32_t info_length; /* CANNOT exceed 64k (65552) bytes */ + char vfd_key[8]; /* 's' 'u' 'b' 'f' 'i' 'l' 'i' 'n' */ } info_header_t; /* THE following definitions are used between H5FDsubfile_mpi.c @@ -214,21 +213,21 @@ typedef struct _info_header { /* Header for a driver info message */ /* Bit 3 SET indicates collectives */ #define COLL_FUNC (0x1 << 3) -#define ACK_PART (0x0acc << 8) +#define ACK_PART (0x0acc << 8) #define DATA_PART (0xd8da << 8) -#define READY (0xfeed << 8) +#define READY (0xfeed << 8) #define COMPLETED (0xfed1 << 8) -#define READ_INDEP (READ_OP) -#define READ_COLL (COLL_FUNC | READ_OP) +#define READ_INDEP (READ_OP) +#define READ_COLL (COLL_FUNC | READ_OP) #define WRITE_INDEP (WRITE_OP) -#define WRITE_COLL (COLL_FUNC | WRITE_OP) +#define WRITE_COLL (COLL_FUNC | WRITE_OP) -#define WRITE_INDEP_ACK (ACK_PART | WRITE_OP) +#define WRITE_INDEP_ACK (ACK_PART | WRITE_OP) #define WRITE_INDEP_DATA (DATA_PART | WRITE_OP) #define READ_INDEP_DATA (DATA_PART | READ_OP) -#define SET_LOGGING (LOGGING_OP) +#define SET_LOGGING (LOGGING_OP) #define INT32_MASK 0x07FFFFFFFFFFFFFFF @@ -240,22 +239,22 @@ typedef struct _info_header { /* Header for a driver info message */ * We currently ONLY use READ_OP and WRITE_OP */ typedef enum io_ops { - READ_OP = 1, - WRITE_OP = 2, - OPEN_OP = 3, - CLOSE_OP = 4, - FINI_OP = 8, - LOGGING_OP = 16 + READ_OP = 1, + WRITE_OP = 2, + OPEN_OP = 3, + CLOSE_OP = 4, + FINI_OP = 8, + LOGGING_OP = 16 } io_op_t; /* Here are the basic key values to be used when accessing * the cache of stored topologies or contexts. */ typedef enum { - SF_BADID = (-1), - SF_TOPOLOGY = 1, - SF_CONTEXT = 2, - SF_NTYPES /* number of subfiling object types, MUST BE LAST */ + SF_BADID = (-1), + SF_TOPOLOGY = 1, + SF_CONTEXT = 2, + SF_NTYPES /* number of subfiling object types, MUST BE LAST */ } sf_obj_type_t; /* Every application rank will record their MPI rank @@ -268,54 +267,57 @@ typedef enum { * associated with a "new" hostid. */ typedef struct { - long rank; - long hostid; + long rank; + long hostid; } layout_t; /* This typedef defines a fixed process layout which * can be reused for any number of file open operations */ typedef struct app_layout_t { - long hostid; /* value returned by gethostid() */ - layout_t *layout; /* Vector of {rank,hostid} values */ - int *node_ranks; /* ranks extracted from sorted layout */ - int node_count; /* Total nodes (differnt hostids) */ - int node_index; /* My node: index into node_ranks */ - int local_peers; /* How may local peers on my node */ - int world_rank; /* My MPI rank */ - int world_size; /* Total number of MPI ranks */ + long hostid; /* value returned by gethostid() */ + layout_t *layout; /* Vector of {rank,hostid} values */ + int * node_ranks; /* ranks extracted from sorted layout */ + int node_count; /* Total nodes (differnt hostids) */ + int node_index; /* My node: index into node_ranks */ + int local_peers; /* How may local peers on my node */ + int world_rank; /* My MPI rank */ + int world_size; /* Total number of MPI ranks */ } app_layout_t; /* This typedef defines things related to IOC selections */ typedef struct topology { - app_layout_t *app_layout; /* Pointer to our layout struct */ - bool rank_is_ioc; /* Indicates that we host an IOC */ - int subfile_rank; /* Valid only if rank_is_ioc */ - int n_io_concentrators; /* Number of IO concentrators */ - int *io_concentrator; /* Vector of ranks which are IOCs */ - int *subfile_fd; /* file descriptor (if IOC) */ - ioc_selection_t selection_type; /* Cache our IOC selection criteria */ + app_layout_t * app_layout; /* Pointer to our layout struct */ + bool rank_is_ioc; /* Indicates that we host an IOC */ + int subfile_rank; /* Valid only if rank_is_ioc */ + int n_io_concentrators; /* Number of IO concentrators */ + int * io_concentrator; /* Vector of ranks which are IOCs */ + int * subfile_fd; /* file descriptor (if IOC) */ + ioc_selection_t selection_type; /* Cache our IOC selection criteria */ } sf_topology_t; typedef struct { - hid_t sf_context_id; /* Generated context ID which embeds the cache index */ - uint64_t h5_file_id; /* GUID (basically the inode value) */ - int sf_fid; /* value returned by open(file,..) */ - size_t sf_write_count; /* Statistics: write_count */ - size_t sf_read_count; /* Statistics: read_count */ - size_t sf_eof; /* File eof */ - int64_t sf_stripe_size; /* Stripe-depth */ - int64_t sf_blocksize_per_stripe; /* Stripe-depth X n_IOCs */ - MPI_Comm sf_msg_comm; /* MPI comm used to send RPC msg */ - MPI_Comm sf_data_comm; /* MPI comm used to move data */ - MPI_Comm sf_group_comm; /* Not used: for IOC collectives */ - MPI_Comm sf_intercomm; /* Not used: for msgs to all IOC */ - int sf_group_size; /* IOC count (in sf_group_comm) */ - int sf_group_rank; /* IOC rank (in sf_group_comm) */ - int sf_intercomm_root; /* Not used: for IOC comms */ - char *subfile_prefix; /* If subfiles are node-local */ - char *filename; /* The user supplied file name */ - sf_topology_t *topology; /* pointer to our topology */ + hid_t sf_context_id; /* Generated context ID which embeds the cache index */ + uint64_t h5_file_id; /* GUID (basically the inode value) */ + int sf_fid; /* value returned by open(file,..) */ + size_t sf_write_count; /* Statistics: write_count */ + size_t sf_read_count; /* Statistics: read_count */ + haddr_t sf_eof; /* File eof */ + int64_t sf_stripe_size; /* Stripe-depth */ + int64_t sf_blocksize_per_stripe; /* Stripe-depth X n_IOCs */ + int64_t sf_base_addr; /* For an IOC, our base address */ + MPI_Comm sf_msg_comm; /* MPI comm used to send RPC msg */ + MPI_Comm sf_data_comm; /* MPI comm used to move data */ + MPI_Comm sf_group_comm; /* Not used: for IOC collectives */ + MPI_Comm sf_intercomm; /* Not used: for msgs to all IOC */ + int sf_group_size; /* IOC count (in sf_group_comm) */ + int sf_group_rank; /* IOC rank (in sf_group_comm) */ + int sf_intercomm_root; /* Not used: for IOC comms */ + char * subfile_prefix; /* If subfiles are node-local */ + char * sf_filename; /* A generated subfile name */ + char * h5_filename; /* The user supplied file name */ + sf_topology_t *topology; /* pointer to our topology */ + } subfiling_context_t; /* The following is a somewhat augmented input (by the IOC) which captures @@ -323,21 +325,21 @@ typedef struct { * an easy gathering of statistics by the IO Concentrator. */ typedef struct { - /* {Datasize, Offset, FileID} */ - int64_t header[3]; /* The basic RPC input plus */ - int tag; /* the supplied OPCODE tag */ - int source; /* Rank of who sent the message */ - int subfile_rank; /* The IOC rank */ - hid_t context_id; /* context to be used to complete */ - double start_time; /* the request, + time of receipt */ - /* from which we calc Time(queued) */ - void *buffer; /* for writes, we keep the buffer */ - int completed; /* around for awhile... */ + /* {Datasize, Offset, FileID} */ + int64_t header[3]; /* The basic RPC input plus */ + int tag; /* the supplied OPCODE tag */ + int source; /* Rank of who sent the message */ + int subfile_rank; /* The IOC rank */ + hid_t context_id; /* context to be used to complete */ + double start_time; /* the request, + time of receipt */ + /* from which we calc Time(queued) */ + void *buffer; /* for writes, we keep the buffer */ + int completed; /* around for awhile... */ } sf_work_request_t; -typedef struct { /* Format of a context map entry */ - uint64_t h5_file_id; /* key value (linear search of the cache) */ - hid_t sf_context_id; /* The return value if matching h5_file_id */ +typedef struct { /* Format of a context map entry */ + uint64_t h5_file_id; /* key value (linear search of the cache) */ + hid_t sf_context_id; /* The return value if matching h5_file_id */ } file_map_to_context_t; /* @@ -349,11 +351,11 @@ typedef struct { /* Format of a context map entry */ * allow me to get the inode info. */ typedef struct H5FD_sec2_t { - H5FD_t pub; /* public stuff, must be first */ - int fd; /* the filesystem file descriptor */ + H5FD_t pub; /* public stuff, must be first */ + int fd; /* the filesystem file descriptor */ } H5FD_sec2_t; -extern int sf_verbose_flag; +extern int sf_verbose_flag; extern atomic_int sf_work_pending; extern atomic_int sf_file_open_count; extern atomic_int sf_file_close_count; -- cgit v0.12