summaryrefslogtreecommitdiffstats
path: root/src/H5FDsubfile.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/H5FDsubfile.c')
-rw-r--r--src/H5FDsubfile.c250
1 files changed, 250 insertions, 0 deletions
diff --git a/src/H5FDsubfile.c b/src/H5FDsubfile.c
new file mode 100644
index 0000000..2b94f0d
--- /dev/null
+++ b/src/H5FDsubfile.c
@@ -0,0 +1,250 @@
+
+#include "H5FDsubfile_public.h"
+
+#ifdef H5_HAVE_PARALLEL
+
+/***********/
+/* Headers */
+/***********/
+#include "H5private.h" /* Generic Functions */
+#include "H5CXprivate.h" /* API Contexts */
+#include "H5Dprivate.h" /* Datasets */
+#include "H5Eprivate.h" /* Error handling */
+#include "H5Ipublic.h" /* IDs */
+#include "H5Iprivate.h" /* IDs */
+#include "H5MMprivate.h" /* Memory management */
+#include "H5Pprivate.h" /* Property lists */
+
+/*
+=========================================
+Private functions
+========================================
+*/
+
+static size_t sf_topology_limit = 4;
+static size_t sf_topology_entries = 0;
+static sf_topology_t **sf_topology_cache = NULL;
+
+static size_t sf_context_limit = 4;
+static size_t sf_context_entries = 0;
+static subfiling_context_t **sf_context_cache = NULL;
+static hid_t context_id = H5I_INVALID_HID;
+static hid_t topology_id = H5I_INVALID_HID;
+
+
+static int64_t record_subfiling_object(SF_OBJ_TYPE type, void *obj)
+{
+ size_t index;
+ int64_t obj_reference;
+ uint64_t tag;
+ switch(type) {
+ case SF_TOPOLOGY: {
+ if (sf_topology_cache == NULL) {
+ sf_topology_cache = (sf_topology_t **)
+ calloc(sf_topology_limit, sizeof(sf_topology_t *));
+ }
+ assert(sf_topology_cache != NULL);
+ index = sf_topology_entries++;
+ tag = SF_TOPOLOGY;
+ obj_reference = (int64_t)((tag << 32) | index);
+ sf_topology_cache[index] = obj;
+ return obj_reference;
+ break;
+ }
+ case SF_CONTEXT: {
+ if (sf_context_cache == NULL) {
+ sf_context_cache = (subfiling_context_t **)
+ calloc(sf_context_limit, sizeof(subfiling_context_t *));
+ }
+ assert(sf_context_cache != NULL);
+ index = sf_context_entries++;
+ tag = SF_CONTEXT;
+ obj_reference = (int64_t)((tag << 32) | index);
+ sf_context_cache[index] = (subfiling_context_t *)obj;
+ return obj_reference;
+ break;
+ }
+ default:
+ puts("UNKNOWN Subfiling object type");
+ }
+
+ return -1;
+}
+
+/*
+=========================================
+Public vars (for subfiling) and functions
+========================================
+*/
+
+int sf_verbose_flag = 0;
+
+/*
+=========================================
+File functions
+=========================================
+
+The pread and pwrite posix functions are described as
+being thread safe. We include mutex locks and unlocks
+to work around any potential threading conflicts...
+Those however, are compiled according #ifdef
+*/
+
+int sf_read_data(int fd, int64_t file_offset, void *data_buffer, int64_t data_size, int subfile_rank)
+{
+ int ret = 0;
+ ssize_t bytes_read;
+ ssize_t bytes_remaining = (ssize_t)data_size;
+ char *this_buffer = data_buffer;
+
+ while(bytes_remaining) {
+ if ((bytes_read = (ssize_t)pread(fd, this_buffer, (size_t)bytes_remaining, file_offset)) < 0) {
+ perror("pread failed!");
+ fflush(stdout);
+ }
+ else if (bytes_read > 0) {
+ if (sf_verbose_flag) {
+ printf("[ioc(%d) %s] read %ld bytes of %ld requested\n",
+ subfile_rank, __func__,
+ bytes_read, bytes_remaining);
+ }
+ bytes_remaining -= bytes_read;
+ this_buffer += bytes_read;
+ file_offset += bytes_read;
+ }
+ else {
+ printf("[ioc(%d) %s] ERROR! read of 0 bytes == eof!\n", subfile_rank, __func__ );
+ fflush(stdout);
+ break;
+ }
+ }
+ return ret;
+}
+
+int sf_write_data(int fd, int64_t file_offset, void *data_buffer, int64_t data_size, int subfile_rank)
+{
+ int ret = 0;
+ char *this_data = (char *)data_buffer;
+ ssize_t bytes_remaining = (ssize_t) data_size;
+ ssize_t written = 0;
+
+ while(bytes_remaining) {
+ if ((written = pwrite(fd, this_data, (size_t)bytes_remaining, file_offset)) < 0) {
+ perror("pwrite failed!");
+ fflush(stdout);
+ }
+ else {
+ if (sf_verbose_flag) {
+ printf("[ioc(%d) %s] wrote %ld bytes of %ld requested\n",
+ subfile_rank, __func__,
+ written, bytes_remaining);
+ }
+ bytes_remaining -= written;
+ this_data += written;
+ file_offset += written;
+ }
+ }
+#ifdef SUBFILE_REQUIRE_FLUSH
+ fdatasync(fd);
+#endif
+
+ return ret;
+}
+
+
+
+
+void * get_subfiling_object(int64_t object_id)
+{
+ int obj_type = (int)((object_id >> 32) & 0x0FFFF);
+ /* We don't require a large indexing space
+ * 16 bits should be enough..
+ */
+ size_t index = (object_id & 0x0FFFF);
+ if (obj_type == SF_TOPOLOGY) {
+ if (index < sf_context_entries) {
+ return (void *)sf_topology_cache[index];
+ }
+ else {
+ puts("Illegal object index");
+ }
+ }
+ else if (obj_type == SF_CONTEXT) {
+ if (index < sf_context_entries) {
+ return (void *)sf_context_cache[index];
+ }
+ else {
+ puts("Illegal object index");
+ }
+ }
+ else {
+ puts("UNKNOWN Subfiling object type");
+ }
+ return NULL;
+}
+
+herr_t
+H5FDsubfiling_init(void)
+{
+ herr_t ret_value = SUCCEED;
+ int ioc_count;
+ int world_rank, world_size;
+ sf_topology_t *thisApp = NULL;
+ subfiling_context_t *newContext = NULL;
+
+
+
+ if (MPI_Comm_size(MPI_COMM_WORLD, &world_size) != MPI_SUCCESS) {
+ puts("MPI_Comm_size returned an error");
+ ret_value = FAIL;
+ goto done;
+ }
+ if (MPI_Comm_rank(MPI_COMM_WORLD, &world_rank) != MPI_SUCCESS) {
+ puts("MPI_Comm_rank returned an error");
+ ret_value = FAIL;
+ goto done;
+ }
+ if ((ioc_count = H5FD__determine_ioc_count (world_size, world_rank, &thisApp)) > 0) {
+ topology_id = (hid_t)record_subfiling_object(SF_TOPOLOGY, thisApp);
+ }
+ if (topology_id < 0) {
+ puts("Unable to register subfiling topology!");
+ ret_value = FAIL;
+ goto done;
+ }
+ if (H5FD__init_subfile_context(&newContext, ioc_count, world_size, world_rank, thisApp->rank_is_ioc) != SUCCEED) {
+ puts("Unable to initialize a subfiling context!");
+ ret_value = FAIL;
+ goto done;
+ }
+ context_id = (hid_t)record_subfiling_object(SF_CONTEXT, newContext);
+ if (context_id < 0) {
+ ret_value = FAIL;
+ puts("Unable to register subfiling context!");
+ }
+
+done:
+ return ret_value;
+}
+
+herr_t
+H5FDsubfiling_finalize(void)
+{
+ herr_t ret_value = SUCCEED; /* Return value */
+
+ /* Shutdown the IO Concentrator threads */
+ sf_shutdown_flag = 1;
+ usleep(100);
+ MPI_Barrier(MPI_COMM_WORLD);
+ delete_subfiling_context(context_id);
+
+ return ret_value;
+}
+
+hid_t
+get_subfiling_context()
+{
+ return context_id;
+}
+
+#endif /* H5_HAVE_PARALLEL */