diff options
author | Richard Warren <Richard.Warren@hdfgroup.org> | 2020-09-25 11:45:15 (GMT) |
---|---|---|
committer | Richard Warren <Richard.Warren@hdfgroup.org> | 2020-09-25 11:45:15 (GMT) |
commit | 5dd85abb4124f337eea52ef33d0c7e3fd67ed92d (patch) | |
tree | 7146b025f19d5d5bb5f85428f2cf6d6d91ceca19 /src/H5FDsubfile.c | |
parent | d612a249afe1eb4799ccbf725d8309452446dee7 (diff) | |
download | hdf5-5dd85abb4124f337eea52ef33d0c7e3fd67ed92d.zip hdf5-5dd85abb4124f337eea52ef33d0c7e3fd67ed92d.tar.gz hdf5-5dd85abb4124f337eea52ef33d0c7e3fd67ed92d.tar.bz2 |
Create a new branch for the September Subfiling deliverable
Diffstat (limited to 'src/H5FDsubfile.c')
-rw-r--r-- | src/H5FDsubfile.c | 475 |
1 files changed, 268 insertions, 207 deletions
diff --git a/src/H5FDsubfile.c b/src/H5FDsubfile.c index a467533..2b3d44b 100644 --- a/src/H5FDsubfile.c +++ b/src/H5FDsubfile.c @@ -1,273 +1,334 @@ +/* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * + * Copyright by The HDF Group. * + * Copyright by the Board of Trustees of the University of Illinois. * + * All rights reserved. * + * * + * This file is part of HDF5. The full HDF5 copyright notice, including * + * terms governing use, modification, and redistribution, is contained in * + * the COPYING file, which can be found at the root of the source code * + * distribution tree, or in https://support.hdfgroup.org/ftp/HDF5/releases. * + * If you do not have access to either file, you may request a copy from * + * help@hdfgroup.org. * + * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */ -#include "H5FDsubfile_public.h" +/* + * Programmer: Richard Warren <Richard.Warren@hdfgroup.org> + * Wednesday, July 1, 2020 + * + * Purpose: This is part of a parallel subfiling I/O driver. + * + */ -#ifdef H5_HAVE_PARALLEL +#include "H5FDsubfile_public.h" /***********/ /* Headers */ /***********/ -#include "H5private.h" /* Generic Functions */ -#include "H5CXprivate.h" /* API Contexts */ -#include "H5Dprivate.h" /* Datasets */ -#include "H5Eprivate.h" /* Error handling */ -#include "H5Ipublic.h" /* IDs */ -#include "H5Iprivate.h" /* IDs */ -#include "H5MMprivate.h" /* Memory management */ -#include "H5Pprivate.h" /* Property lists */ - -/* +#include "H5CXprivate.h" /* API Contexts */ +#include "H5Dprivate.h" /* Datasets */ +#include "H5Eprivate.h" /* Error handling */ +#include "H5Iprivate.h" /* IDs */ +#include "H5Ipublic.h" /* IDs */ +#include "H5MMprivate.h" /* Memory management */ +#include "H5Pprivate.h" /* Property lists */ +#include "H5private.h" /* Generic Functions */ + +/* ========================================= Private functions -======================================== +========================================= */ -static size_t sf_topology_limit = 4; -static size_t sf_topology_entries = 0; -static sf_topology_t **sf_topology_cache = NULL; +/* Modifiable via environment variable */ +static sf_ioc_selection_t sf_default_ioc_selection = SELECT_IOC_ONE_PER_NODE; -static size_t sf_context_limit = 4; -static size_t sf_context_entries = 0; -static subfiling_context_t **sf_context_cache = NULL; -static hid_t context_id = H5I_INVALID_HID; -static hid_t topology_id = H5I_INVALID_HID; +/* +----------------------------------------------------------------------------------- +sf_topology_limit -- How many different topologies can be recorded (default = +4) sf_topology_entries -- The number of topologies that are currently recorded. +sf_topology_cache -- Storage for the known topologies +----------------------------------------------------------------------------------- +*/ +static size_t sf_topology_limit = 4; +static sf_topology_t *sf_topology_cache = NULL; +/* +-------------------------------------------------------------------------- +sf_context_limit -- How many contexts can be recorded (default = 4) +sf_context_entries -- The number of contexts that are currently recorded. +sf_context_cache -- Storage for contexts +-------------------------------------------------------------------------- +*/ +static size_t sf_context_limit = 16; +static subfiling_context_t *sf_context_cache = NULL; -static int64_t record_subfiling_object(SF_OBJ_TYPE type, void *obj) +/* +------------------------------------------------------------------------- + Programmer: Richard Warren <Richard.Warren@hdfgroup.org> + Purpose: Return a pointer to the requested storage object. + There are only 2 object types: TOPOLOGY or CONTEXT + structures. An object_id contains the object type + in upper 32 bits and an index value in the lower 32 bits. + Storage for an object is allocated as required. + + Topologies are static, i.e. for any one IO Concentrator + allocation strategy, the results should always be the + same. + FIXME: The one exception to this being the 1 IOC per + N MPI ranks. The value of N can be changed on a per-file + basis, so we need address that at some point. + + Contexts are 1 per open file. If only one file is open + at a time, then we will only use a single context cache + entry. + Errors: returns NULL if input SF_OBJ_TYPE is unrecognized or + a memory allocation error. + + Revision History -- Initial implementation +------------------------------------------------------------------------- +*/ +void * +get_subfiling_object(int64_t object_id) { - size_t index; - int64_t obj_reference; - uint64_t tag; - switch(type) { - case SF_TOPOLOGY: { - if (sf_topology_cache == NULL) { - sf_topology_cache = (sf_topology_t **) - calloc(sf_topology_limit, sizeof(sf_topology_t *)); - } - assert(sf_topology_cache != NULL); - index = sf_topology_entries++; - tag = SF_TOPOLOGY; - obj_reference = (int64_t)((tag << 32) | index); - sf_topology_cache[index] = obj; - return obj_reference; - break; - } - case SF_CONTEXT: { - if (sf_context_cache == NULL) { - sf_context_cache = (subfiling_context_t **) - calloc(sf_context_limit, sizeof(subfiling_context_t *)); - } - assert(sf_context_cache != NULL); - index = sf_context_entries++; - tag = SF_CONTEXT; - obj_reference = (int64_t)((tag << 32) | index); - sf_context_cache[index] = (subfiling_context_t *)obj; - return obj_reference; - break; - } - default: - puts("UNKNOWN Subfiling object type"); - } - - return -1; + int obj_type = (int) ((object_id >> 32) & 0x0FFFF); + /* We don't require a large indexing space + * 16 bits should be enough.. + */ + size_t index = (object_id & 0x0FFFF); + if (obj_type == SF_TOPOLOGY) { + if (sf_topology_cache == NULL) { + sf_topology_cache = (sf_topology_t *) calloc( + sf_topology_limit, sizeof(sf_topology_t)); + assert(sf_topology_cache != NULL); + } + if (index < sf_topology_limit) { + return (void *) &sf_topology_cache[index]; + } else { + puts("Illegal toplogy object index"); + } + } else if (obj_type == SF_CONTEXT) { + if (sf_context_cache == NULL) { + sf_context_cache = (subfiling_context_t *) calloc( + sf_context_limit, sizeof(subfiling_context_t)); + assert(sf_context_cache != NULL); + } + if (index == sf_context_limit) { + sf_context_limit *= 2; + sf_context_cache = (subfiling_context_t *) realloc(sf_context_cache, + sf_context_limit * sizeof(subfiling_context_t)); + assert(sf_context_cache != NULL); + } else { + return (void *) &sf_context_cache[index]; + } + } else { + printf( + "get_subfiling_object: UNKNOWN Subfiling object type id = 0x%lx\n", + object_id); + } + return NULL; } -/* -========================================= +/* +====================================================== Public vars (for subfiling) and functions -======================================== +We probably need a function to set and clear this +====================================================== */ - int sf_verbose_flag = 0; /* -========================================= +====================================================== File functions -========================================= The pread and pwrite posix functions are described as -being thread safe. We include mutex locks and unlocks -to work around any potential threading conflicts... -Those however, are compiled according #ifdef +being thread safe. +====================================================== */ - -int sf_read_data(int fd, int64_t file_offset, void *data_buffer, int64_t data_size, int subfile_rank) +int +sf_read_data(int fd, int64_t file_offset, void *data_buffer, int64_t data_size, + int subfile_rank) { - int ret = 0; + int ret = 0; ssize_t bytes_read; - ssize_t bytes_remaining = (ssize_t)data_size; - char *this_buffer = data_buffer; + ssize_t bytes_remaining = (ssize_t) data_size; + char * this_buffer = data_buffer; + + while (bytes_remaining) { + if ((bytes_read = (ssize_t) pread( + fd, this_buffer, (size_t) bytes_remaining, file_offset)) < 0) { - while(bytes_remaining) { - if ((bytes_read = (ssize_t)pread(fd, this_buffer, (size_t)bytes_remaining, file_offset)) < 0) { perror("pread failed!"); + printf("[ioc(%d) %s] pread(fd, buf, bytes_remaining=%ld, " + "file_offset =%ld)\n", + subfile_rank, __func__, bytes_remaining, file_offset); fflush(stdout); - } - else if (bytes_read > 0) { - if (sf_verbose_flag) { - printf("[ioc(%d) %s] read %ld bytes of %ld requested\n", - subfile_rank, __func__, - bytes_read, bytes_remaining); - } + return -1; + } else if (bytes_read > 0) { bytes_remaining -= bytes_read; this_buffer += bytes_read; file_offset += bytes_read; - } - else { - printf("[ioc(%d) %s] ERROR! read of 0 bytes == eof!\n", subfile_rank, __func__ ); + } else { + printf("[ioc(%d) %s] ERROR! read of 0 bytes == eof!\n", + subfile_rank, __func__); fflush(stdout); - break; + return -2; } } return ret; } -int sf_write_data(int fd, int64_t file_offset, void *data_buffer, int64_t data_size, int subfile_rank) +int +sf_write_data(int fd, int64_t file_offset, void *data_buffer, int64_t data_size, + int subfile_rank) { - int ret = 0; - char *this_data = (char *)data_buffer; + int ret = 0; + char * this_data = (char *) data_buffer; ssize_t bytes_remaining = (ssize_t) data_size; ssize_t written = 0; - while(bytes_remaining) { - if ((written = pwrite(fd, this_data, (size_t)bytes_remaining, file_offset)) < 0) { + while (bytes_remaining) { + if ((written = pwrite( + fd, this_data, (size_t) bytes_remaining, file_offset)) < 0) { perror("pwrite failed!"); + printf("[ioc(%d) %s] pwrite(fd, data, bytes_remaining=%ld, " + "file_offset =%ld)\n", + subfile_rank, __func__, bytes_remaining, file_offset); fflush(stdout); - break; - } - else { - if (sf_verbose_flag) { - printf("[ioc(%d) %s] wrote %ld bytes of %ld requested\n", - subfile_rank, __func__, - written, bytes_remaining); - } + return -1; + } else { bytes_remaining -= written; this_data += written; file_offset += written; } } + /* We don't usually use this for each file write. We usually do the file + * flush as part of file close operation. + */ #ifdef SUBFILE_REQUIRE_FLUSH fdatasync(fd); #endif - return ret; } - - - -void * get_subfiling_object(int64_t object_id) +/* +------------------------------------------------------------------------- + Programmer: Richard Warren <Richard.Warren@hdfgroup.org> + Purpose: Return a character string which represents either the + default selection method: SELECT_IOC_ONE_PER_NODE; or + if the user has selected a method via the environment + variable (H5_IOC_SELECTION_CRITERIA), we return that + along with any optional qualifier with for that method. + + Errors: None. + + Revision History -- Initial implementation +------------------------------------------------------------------------- +*/ +char * +get_ioc_selection_criteria(sf_ioc_selection_t *selection) { - int obj_type = (int)((object_id >> 32) & 0x0FFFF); - /* We don't require a large indexing space - * 16 bits should be enough.. - */ - size_t index = (object_id & 0x0FFFF); - if (obj_type == SF_TOPOLOGY) { - if (index < sf_context_entries) { - return (void *)sf_topology_cache[index]; - } - else { - puts("Illegal object index"); - } - } - else if (obj_type == SF_CONTEXT) { - if (index < sf_context_entries) { - return (void *)sf_context_cache[index]; - } - else { - puts("Illegal object index"); - } - } - else { - puts("UNKNOWN Subfiling object type"); - } - return NULL; + char *optValue = NULL; + char *envValue = HDgetenv("H5_IOC_SELECTION_CRITERIA"); + + /* For non-default options, the environment variable + * should have the following form: integer:[integer|string] + * In particular, EveryNthRank == 1:64 or every 64 ranks assign an IOC + * or WithConfig == 2:/<full_path_to_config_file> + */ + if (envValue && (optValue = strchr(envValue, ':'))) { + *optValue++ = 0; + } + if (envValue) { + int checkValue = atoi(envValue); + if ((checkValue < 0) || (checkValue >= ioc_selection_options)) { + *selection = sf_default_ioc_selection; + return NULL; + } else { + *selection = (sf_ioc_selection_t) checkValue; + return optValue; + } + } + *selection = sf_default_ioc_selection; + return NULL; } +/* +------------------------------------------------------------------------- + Programmer: Richard Warren <Richard.Warren@hdfgroup.org> + Purpose: Called as part of a file open operation, we initialize a + subfiling context which includes the application topology + along with other relevant info such as the MPI objects + (communicators) for communicating with IO concentrators. + We also identify which MPI ranks will have IOC threads + started on them. + + We return a context ID via the 'sf_context' variable. + + Errors: returns an error if we detect any initialization errors, + including malloc failures or any resource allocation + problems. + + Revision History -- Initial implementation +------------------------------------------------------------------------- +*/ herr_t -H5FDsubfiling_init(void) +H5FDsubfiling_init(sf_ioc_selection_t ioc_select_method, char *ioc_select_option, + int64_t *sf_context) { - herr_t ret_value = SUCCEED; - int ioc_count; - int world_rank, world_size; - sf_topology_t *thisApp = NULL; - subfiling_context_t *newContext = NULL; - - FUNC_ENTER_API(FAIL) - H5TRACE0("e",""); - - if (MPI_Comm_size(MPI_COMM_WORLD, &world_size) != MPI_SUCCESS) { - puts("MPI_Comm_size returned an error"); - ret_value = FAIL; - goto done; - } - if (MPI_Comm_rank(MPI_COMM_WORLD, &world_rank) != MPI_SUCCESS) { - puts("MPI_Comm_rank returned an error"); - ret_value = FAIL; - goto done; - } - if ((ioc_count = H5FD__determine_ioc_count (world_size, world_rank, &thisApp)) > 0) { - topology_id = (hid_t)record_subfiling_object(SF_TOPOLOGY, thisApp); - } - if (topology_id < 0) { - puts("Unable to register subfiling topology!"); - ret_value = FAIL; - goto done; - } - if (H5FD__init_subfile_context(&newContext, ioc_count, world_size, world_rank, thisApp->rank_is_ioc) != SUCCEED) { - puts("Unable to initialize a subfiling context!"); - ret_value = FAIL; - goto done; - } - context_id = (hid_t)record_subfiling_object(SF_CONTEXT, newContext); - if (context_id < 0) { - ret_value = FAIL; - puts("Unable to register subfiling context!"); - } - -done: - FUNC_LEAVE_API(ret_value) + herr_t ret_value = SUCCEED; + int ioc_count; + int world_rank, world_size; + sf_topology_t * thisApp = NULL; + int active_file_maps = active_map_entries(); + int64_t tag = SF_CONTEXT; + int64_t context_id = ((tag << 32) | active_file_maps); + subfiling_context_t *newContext = + (subfiling_context_t *) get_subfiling_object(context_id); + + FUNC_ENTER_API(FAIL) + H5TRACE3("e", "x*s*IL", ioc_select_method, ioc_select_option, sf_context); + + if (MPI_Comm_size(MPI_COMM_WORLD, &world_size) != MPI_SUCCESS) { + puts("MPI_Comm_size returned an error"); + ret_value = FAIL; + goto done; + } + if (MPI_Comm_rank(MPI_COMM_WORLD, &world_rank) != MPI_SUCCESS) { + puts("MPI_Comm_rank returned an error"); + ret_value = FAIL; + goto done; + } - return ret_value; -} + if ((ioc_count = H5FD__determine_ioc_count(world_size, world_rank, + ioc_select_method, ioc_select_option, &thisApp)) <= 0) { + puts("Unable to register subfiling topology!"); + ret_value = FAIL; + goto done; + } -herr_t -H5FDsubfiling_finalize(void) -{ - herr_t ret_value = SUCCEED; /* Return value */ - sf_topology_t *thisApp = NULL; - - FUNC_ENTER_API(FAIL) - H5TRACE0("e",""); - - /* Shutdown the IO Concentrator threads */ - - if (topology_id != H5I_INVALID_HID) { - thisApp = get_subfiling_object(topology_id); - } - - if (thisApp && thisApp->rank_is_ioc) { - begin_thread_exclusive(); - sf_shutdown_flag = 1; - end_thread_exclusive(); - - usleep(100); - - wait_for_thread_main(); - } - - MPI_Barrier(MPI_COMM_WORLD); - - delete_subfiling_context(context_id); + newContext->sf_context_id = context_id; + + if (H5FD__init_subfile_context( + thisApp, ioc_count, world_rank, newContext) != SUCCEED) { + puts("Unable to initialize a subfiling context!"); + ret_value = FAIL; + goto done; + } + + if (newContext->topology->rank_is_ioc) { + int status = initialize_ioc_threads(newContext); + if (status) + goto done; + } + + if (context_id < 0) { + ret_value = FAIL; + + goto done; + } + *sf_context = context_id; - FUNC_LEAVE_API(ret_value) done: - return ret_value; -} + FUNC_LEAVE_API(ret_value) -hid_t -get_subfiling_context(void) -{ - return context_id; + return ret_value; } - -#endif /* H5_HAVE_PARALLEL */ |