summaryrefslogtreecommitdiffstats
path: root/utils/mirror_vfd
diff options
context:
space:
mode:
authorJacob Smith <jake.smith@hdfgroup.org>2020-03-13 22:13:17 (GMT)
committerJacob Smith <jake.smith@hdfgroup.org>2020-03-13 22:13:17 (GMT)
commitb65405439d88be6c09fe94360e7fea9856697926 (patch)
treef81fc8aa14e58a3b500cdf64a6a53b7150d8ac3e /utils/mirror_vfd
parent7613f7e1aa89210bb625d59d79a6220c49a1f22c (diff)
downloadhdf5-b65405439d88be6c09fe94360e7fea9856697926.zip
hdf5-b65405439d88be6c09fe94360e7fea9856697926.tar.gz
hdf5-b65405439d88be6c09fe94360e7fea9856697926.tar.bz2
Add Splitter VFD to library.
* "Simultaneous and equivalent" Read-Write and Write-Only channels for file I/O. * Only supports drivers with the H5FD_FEAT_DEFAULT_VFD_COMPATIBLE flag for now, preventing issues with multi-file drivers. Add Mirror VFD to library. * Write-only operations over a network. * Uses TCP/IP sockets. * Server and auxiliary server-shutdown programs provided in a new directory, `utils/mirror_vfd`. * Automated testing via loopback ("remote" of localhost).
Diffstat (limited to 'utils/mirror_vfd')
-rw-r--r--utils/mirror_vfd/CMakeLists.txt64
-rw-r--r--utils/mirror_vfd/Makefile.am30
-rw-r--r--utils/mirror_vfd/mirror_remote.c225
-rw-r--r--utils/mirror_vfd/mirror_remote.h51
-rw-r--r--utils/mirror_vfd/mirror_server.c645
-rw-r--r--utils/mirror_vfd/mirror_server_halten_sie.c214
-rw-r--r--utils/mirror_vfd/mirror_writer.c1064
7 files changed, 2293 insertions, 0 deletions
diff --git a/utils/mirror_vfd/CMakeLists.txt b/utils/mirror_vfd/CMakeLists.txt
new file mode 100644
index 0000000..405c420
--- /dev/null
+++ b/utils/mirror_vfd/CMakeLists.txt
@@ -0,0 +1,64 @@
+cmake_minimum_required (VERSION 3.10)
+project (HDF5_UTILS_MIRRORVFD C)
+
+#-----------------------------------------------------------------------------
+# Add the mirror_server executable
+#-----------------------------------------------------------------------------
+
+set (mirror_server_SOURCES
+ ${HDF5_UTILS_MIRRORVFD_SOURCE_DIR}/mirror_remote.c
+ ${HDF5_UTILS_MIRRORVFD_SOURCE_DIR}/mirror_server.c
+ ${HDF5_UTILS_MIRRORVFD_SOURCE_DIR}/mirror_writer.c)
+add_executable (mirror_server ${mirror_server_SOURCES})
+target_include_directories (mirror_server PRIVATE "${HDF5_UITLS_DIR};${HDF5_SRC_DIR};${HDF5_BINARY_DIR};$<$<BOOL:${HDF5_ENABLE_PARALLEL}>:${MPI_C_INCLUDE_DIRS}>")
+if (NOT BUILD_SHARED_LIBS)
+ TARGET_C_PROPERTIES (mirror_server STATIC)
+ target_link_libraries (mirror_server PRIVATE ${HDF5_TOOLS_LIB_TARGET} ${HDF5_LIB_TARGET})
+else ()
+ TARGET_C_PROPERTIES (mirror_server SHARED)
+ target_link_libraries (mirror_server PRIVATE ${HDF5_TOOLS_LIBSH_TARGET} ${HDF5_LIBSH_TARGET})
+endif ()
+set_target_properties (mirror_server PROPERTIES FOLDER utils)
+set_global_variable (HDF5_UTILS_TO_EXPORT "${HDF5_UTILS_TO_EXPORT};mirror_server")
+set (H5_DEP_EXECUTABLES ${H5_DEP_EXECUTABLES} mirror_server)
+
+#-----------------------------------------------------------------------------
+# Add the mirror_server_halten_sie executable
+#-----------------------------------------------------------------------------
+
+set (mirror_server_halt_SOURCES ${HDF5_UTILS_MIRRORVFD_SOURCE_DIR}/mirror_server_halten_sie.c)
+add_executable (mirror_server_halt ${mirror_server_halt_SOURCES})
+target_include_directories (mirror_server_halt PRIVATE "${HDF5_UITLS_DIR};${HDF5_SRC_DIR};${HDF5_BINARY_DIR};$<$<BOOL:${HDF5_ENABLE_PARALLEL}>:${MPI_C_INCLUDE_DIRS}>")
+if (NOT BUILD_SHARED_LIBS)
+ TARGET_C_PROPERTIES (mirror_server_halt STATIC)
+ target_link_libraries (mirror_server_halt PRIVATE ${HDF5_TOOLS_LIB_TARGET} ${HDF5_LIB_TARGET})
+else ()
+ TARGET_C_PROPERTIES (mirror_server_halt SHARED)
+ target_link_libraries (mirror_server_halt PRIVATE ${HDF5_TOOLS_LIBSH_TARGET} ${HDF5_LIBSH_TARGET})
+endif ()
+set_target_properties (mirror_server_halt PROPERTIES FOLDER utils)
+set_global_variable (HDF5_UTILS_TO_EXPORT "${HDF5_UTILS_TO_EXPORT};mirror_server_halt")
+set (H5_DEP_EXECUTABLES ${H5_DEP_EXECUTABLES} mirror_server_halt)
+
+##############################################################################
+##############################################################################
+### I N S T A L L A T I O N ###
+##############################################################################
+##############################################################################
+
+#-----------------------------------------------------------------------------
+# Rules for Installation of tools using make Install target
+#-----------------------------------------------------------------------------
+if (HDF5_EXPORTED_TARGETS)
+ foreach (exec ${H5_DEP_EXECUTABLES})
+ INSTALL_PROGRAM_PDB (${exec} ${HDF5_INSTALL_BIN_DIR} utilsapplications)
+ endforeach ()
+
+ install (
+ TARGETS
+ ${H5_DEP_EXECUTABLES}
+ EXPORT
+ ${HDF5_EXPORTED_TARGETS}
+ RUNTIME DESTINATION ${HDF5_INSTALL_BIN_DIR} COMPONENT utilsapplications
+ )
+endif ()
diff --git a/utils/mirror_vfd/Makefile.am b/utils/mirror_vfd/Makefile.am
new file mode 100644
index 0000000..448e4cd
--- /dev/null
+++ b/utils/mirror_vfd/Makefile.am
@@ -0,0 +1,30 @@
+#
+# Copyright by The HDF Group.
+# All rights reserved.
+#
+# This file is part of HDF5. The full HDF5 copyright notice, including
+# terms governing use, modification, and redistribution, is contained in
+# the COPYING file, which can be found at the root of the source code
+# distribution tree, or in https://support.hdfgroup.org/ftp/HDF5/releases.
+# If you do not have access to either file, you may request a copy from
+# help@hdfgroup.org.
+##
+## Makefile.am
+## Run automake to generate a Makefile.in from this file.
+#
+# HDF5 Library Makefile(.in)
+#
+
+include $(top_srcdir)/config/commence.am
+
+AM_CPPFLAGS+=-I$(top_srcdir)/src
+
+bin_PROGRAMS = mirror_server mirror_server_halten_sie
+
+mirror_server_SOURCES = mirror_server.c mirror_writer.c mirror_remote.c
+#mirror_writer_SOURCES = mirror_writer.c mirror_remote.c
+
+# All programs depend on the hdf5 library
+LDADD=$(LIBHDF5)
+
+include $(top_srcdir)/config/conclude.am
diff --git a/utils/mirror_vfd/mirror_remote.c b/utils/mirror_vfd/mirror_remote.c
new file mode 100644
index 0000000..81a3625
--- /dev/null
+++ b/utils/mirror_vfd/mirror_remote.c
@@ -0,0 +1,225 @@
+/* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *
+ * Copyright by The HDF Group. *
+ * All rights reserved. *
+ * *
+ * This file is part of HDF5. The full HDF5 copyright notice, including *
+ * terms governing use, modification, and redistribution, is contained in *
+ * the COPYING file, which can be found at the root of the source code *
+ * distribution tree, or in https://support.hdfgroup.org/ftp/HDF5/releases. *
+ * If you do not have access to either file, you may request a copy from *
+ * help@hdfgroup.org. *
+ * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
+
+/* Common operations for "remote" processes for the Mirror VFD.
+ *
+ * Jacob Smith, 2020-03-06
+ */
+
+#include "mirror_remote.h"
+
+#ifdef H5_HAVE_MIRROR_VFD
+
+
+/* ---------------------------------------------------------------------------
+ * Function: mirror_log
+ *
+ * Purpose: Write message to the logging stream/file.
+ * If logging info pointer is NULL, uses logging defaults.
+ * ----------------------------------------------------------------------------
+ */
+void
+mirror_log(struct mirror_log_info *info,
+ unsigned int level,
+ const char *format,
+ ...)
+{
+ FILE *stream = MIRROR_LOG_DEFAULT_STREAM;
+ unsigned int verbosity = MIRROR_LOG_DEFAULT_VERBOSITY;
+ hbool_t custom = FALSE;
+
+ if (info != NULL && info->magic == MIRROR_LOG_INFO_MAGIC) {
+ stream = info->stream;
+ verbosity = info->verbosity;
+ custom = TRUE;
+ }
+
+ if (level == V_NONE) {
+ return;
+ }
+ else
+ if (level <= verbosity) {
+ if (custom == TRUE && info->prefix[0] != '\0') {
+ HDfprintf(stream, "%s", info->prefix);
+ }
+
+ switch (level) {
+ case (V_ERR) :
+ HDfprintf(stream, "ERROR ");
+ break;
+ case (V_WARN) :
+ HDfprintf(stream, "WARNING ");
+ break;
+ default:
+ break;
+ }
+
+ if (format != NULL) {
+ va_list args;
+ HDva_start(args, format);
+ HDvfprintf(stream, format, args);
+ HDva_end(args);
+ }
+
+ HDfprintf(stream, "\n");
+ HDfflush(stream);
+ } /* end if sufficiently verbose to print */
+} /* end mirror_log() */
+
+
+/* ---------------------------------------------------------------------------
+ * Function: session_log_bytes
+ *
+ * Purpose: "Pretty-print" raw binary data to logging stream/file.
+ * If info pointer is NULL, uses logging defaults.
+ * ----------------------------------------------------------------------------
+ */
+void
+mirror_log_bytes(struct mirror_log_info *info,
+ unsigned int level,
+ size_t n_bytes,
+ const unsigned char *buf)
+{
+ FILE *stream = MIRROR_LOG_DEFAULT_STREAM;
+ unsigned int verbosity = MIRROR_LOG_DEFAULT_VERBOSITY;
+
+ if (buf == NULL) {
+ return;
+ }
+
+ if (info != NULL && info->magic == MIRROR_LOG_INFO_MAGIC) {
+ stream = info->stream;
+ verbosity = info->verbosity;
+ }
+
+ if (level <= verbosity) {
+ size_t bytes_written = 0;
+ const unsigned char *b = NULL;
+
+ /* print whole lines */
+ while ((n_bytes - bytes_written) >= 32) {
+ b = buf + bytes_written; /* point to region in buffer */
+ HDfprintf(stream,
+ "%04zX %02X%02X%02X%02X %02X%02X%02X%02X" \
+ " %02X%02X%02X%02X %02X%02X%02X%02X" \
+ " %02X%02X%02X%02X %02X%02X%02X%02X" \
+ " %02X%02X%02X%02X %02X%02X%02X%02X\n",
+ bytes_written,
+ b[0], b[1], b[2], b[3], b[4], b[5], b[6], b[7],
+ b[8], b[9], b[10],b[11], b[12],b[13],b[14],b[15],
+ b[16],b[17],b[18],b[19], b[20],b[21],b[22],b[23],
+ b[24],b[25],b[26],b[27], b[28],b[29],b[30],b[31]);
+ bytes_written += 32;
+ }
+
+ /* start partial line */
+ if (n_bytes > bytes_written) {
+ HDfprintf(stream, "%04zX ", bytes_written);
+ }
+
+ /* partial line blocks */
+ while ((n_bytes - bytes_written) >= 4) {
+ HDfprintf(stream, " %02X%02X%02X%02X",
+ buf[bytes_written], buf[bytes_written+1],
+ buf[bytes_written+2], buf[bytes_written+3]);
+ bytes_written += 4;
+ }
+
+ /* block separator before partial block */
+ if (n_bytes > bytes_written) {
+ HDfprintf(stream, " ");
+ }
+
+ /* partial block individual bytes */
+ while (n_bytes > bytes_written) {
+ HDfprintf(stream, "%02X", buf[bytes_written++]);
+ }
+
+ /* end partial line */
+ HDfprintf(stream, "\n");
+ } /* end if suitably verbose to log */
+} /* end mirror_log_bytes() */
+
+
+/* ---------------------------------------------------------------------------
+ * Function: mirror_log_init
+ *
+ * Purpose: Prepare a loginfo_t structure for use.
+ *
+ * Return: Success: Pointer to newly-ceated info.
+ * Failure: NULL. Either unable to allocate or cannot open file.
+ * ----------------------------------------------------------------------------
+ */
+loginfo_t *
+mirror_log_init(char *path, char *prefix, unsigned int verbosity)
+{
+ loginfo_t *info = NULL;
+
+ info = (loginfo_t *)HDmalloc(sizeof(loginfo_t));
+ if (info != NULL) {
+ info->magic = MIRROR_LOG_INFO_MAGIC;
+ info->verbosity = verbosity;
+ info->stream = MIRROR_LOG_DEFAULT_STREAM;
+ info->prefix[0] = '\0';
+
+ if (prefix && *prefix) {
+ HDstrncpy(info->prefix, prefix, MIRROR_LOG_PREFIX_MAX);
+ }
+
+ if (path && *path) {
+ FILE *f = NULL;
+ f = HDfopen(path, "w");
+ if (NULL == f) {
+ HDfprintf(MIRROR_LOG_DEFAULT_STREAM,
+ "WARN custom logging path could not be opened: %s\n",
+ path);
+ info->magic += 1;
+ HDfree(info);
+ }
+ else {
+ info->stream = f;
+ }
+ }
+
+ } /* end if able to allocate */
+
+ return info;
+} /* end mirror_log_init() */
+
+
+/* ---------------------------------------------------------------------------
+ * Function: mirror_log_term
+ *
+ * Purpose: Shut down and clean up a loginfo_t structure.
+ *
+ * Return: Success: SUCCEED. Resources released.
+ * Failure: FAIL. Indeterminite state.
+ * ----------------------------------------------------------------------------
+ */
+herr_t
+mirror_log_term(loginfo_t *info)
+{
+ if (info == NULL || info->magic != MIRROR_LOG_INFO_MAGIC) {
+ return FAIL;
+ }
+ if (info->stream != stderr || info->stream != stdout) {
+ if (HDfclose(info->stream) < 0) {
+ return FAIL;
+ }
+ }
+ info->magic += 1;
+ HDfree(info);
+ return SUCCEED;
+} /* end mirror_log_term() */
+
+#endif /* H5_HAVE_MIRROR_VFD */
+
diff --git a/utils/mirror_vfd/mirror_remote.h b/utils/mirror_vfd/mirror_remote.h
new file mode 100644
index 0000000..9132d51
--- /dev/null
+++ b/utils/mirror_vfd/mirror_remote.h
@@ -0,0 +1,51 @@
+/* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *
+ * Copyright by The HDF Group. *
+ * All rights reserved. *
+ * *
+ * This file is part of HDF5. The full HDF5 copyright notice, including *
+ * terms governing use, modification, and redistribution, is contained in *
+ * the COPYING file, which can be found at the root of the source code *
+ * distribution tree, or in https://support.hdfgroup.org/ftp/HDF5/releases. *
+ * If you do not have access to either file, you may request a copy from *
+ * help@hdfgroup.org. *
+ * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
+
+/* Common definitions for "remote" processes for the Mirror VFD.
+ *
+ * Jacob Smith, 2020-03-06
+ */
+
+#include "hdf5.h"
+#include "H5private.h"
+
+#ifdef H5_HAVE_MIRROR_VFD
+
+#define V_NONE 0
+#define V_ERR 1
+#define V_WARN 2
+#define V_INFO 3
+#define V_ALL 4
+
+#define MIRROR_LOG_DEFAULT_STREAM stdout
+#define MIRROR_LOG_DEFAULT_VERBOSITY V_WARN
+#define MIRROR_LOG_PREFIX_MAX 79
+#define MIRROR_LOG_INFO_MAGIC 0x569D589A
+
+typedef struct mirror_log_info {
+ uint32_t magic;
+ FILE *stream;
+ unsigned int verbosity;
+ char prefix[MIRROR_LOG_PREFIX_MAX+1];
+} loginfo_t;
+
+void mirror_log(loginfo_t *info, unsigned int level,
+ const char *format, ...);
+void mirror_log_bytes(loginfo_t *info, unsigned int level,
+ size_t n_bytes, const unsigned char *buf);
+loginfo_t *mirror_log_init(char *path, char *prefix, unsigned int verbosity);
+int mirror_log_term(loginfo_t *loginfo);
+
+herr_t run_writer(int socketfd, H5FD_mirror_xmit_open_t *xmit_open);
+
+#endif /* H5_HAVE_MIRROR_VFD */
+
diff --git a/utils/mirror_vfd/mirror_server.c b/utils/mirror_vfd/mirror_server.c
new file mode 100644
index 0000000..314f067
--- /dev/null
+++ b/utils/mirror_vfd/mirror_server.c
@@ -0,0 +1,645 @@
+/* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *
+ * Copyright by The HDF Group. *
+ * All rights reserved. *
+ * *
+ * This file is part of HDF5. The full HDF5 copyright notice, including *
+ * terms governing use, modification, and redistribution, is contained in *
+ * the COPYING file, which can be found at the root of the source code *
+ * distribution tree, or in https://support.hdfgroup.org/ftp/HDF5/releases. *
+ * If you do not have access to either file, you may request a copy from *
+ * help@hdfgroup.org. *
+ * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
+
+/*
+ * "Server" application to associate a Mirror VFD Driver with a Writer.
+ *
+ * Server waits on a dedicated port for a Driver to attempt to connect.
+ * When connection is made, reads a message from the Driver.
+ * If message is "SHUTDOWN", Server closes connection and terminates.
+ * Else, if it receives an encoded OPEN xmit (from Driver), the Server forks
+ * itself; the child becomes a dedicated Writer and maintains connection with
+ * the Driver instance, and the parent process remains a Server and returns
+ * to listening for incoming requests.
+ * Else, the message is not recognized and is ignored.
+ *
+ *
+ *
+ * mirror_server [args]
+ *
+ * Primary server for coordinating mirror VFD connections with the remote
+ * process.
+ *
+ * args:
+ * --help, -h Print help message and exit.
+ * --port=N Positive integer for primary listening port.
+ * --verbosity=N Debugging verbosity
+ * 0: none
+ * 1: errors
+ * 2: details
+ * 3: all
+ * --logpath=S File path to direct debugging output, if any.
+ * Default of none prints output to stdout.
+ *
+ */
+
+#include "mirror_remote.h"
+
+#ifdef H5_HAVE_MIRROR_VFD
+
+#define MAXBUF 2048 /* max buffer length. */
+#define LISTENQ 80 /* max pending mirrorS requests */
+#define DEFAULT_PORT 3000 /* default listening port */
+#define MAX_PORT_LOOPS 20 /* max iteratations through port range */
+#define PORT_LOOP_RETRY_DELAY 1 /* seconds to wait between port scans */
+
+/* semi-unique "magic" numbers to sanity-check structure pointers */
+#define OP_ARGS_MAGIC 0xCF074379u
+#define SERVER_RUN_MAGIC 0x741B459Au
+
+
+/* ---------------------------------------------------------------------------
+ * Structure: struct op_args
+ *
+ * Purpose: Convenience structure for holding arguments from command-line.
+ *
+ * `magic` (uint32_t)
+ * Semi-unique number to help validate a pointer to this struct type.
+ * Must be OP_ARGS_MAGIC to be considered valid.
+ *
+ * `help` (int)
+ * Flag that the help argument was present in the command line.
+ *
+ * `main_port` (int)
+ * Flag that the help argument was present in the command line.
+ *
+ * `verbosity` (int)
+ * Number between 0 (none) and 4 (all) that controls how much detail
+ * the program prints as a course of logging.
+ *
+ * `log_prepend_serv` (int)
+ * Flag that the logging messages should have 'S- ' at the start of each
+ * line.
+ *
+ * `log_prepend_type` (int)
+ * Flag that the logging messages should have the assocaited verbosity
+ * level present in the line (e.g., "WARN", "ERROR", or "INFO").
+ *
+ * `log_path` (char *)
+ * Path string from the command line, giving the absolute path
+ * for the file for logging output. Can be empty.
+ *
+ * `writer_log_path` (char *)
+ * Path string from the command line, giving the absolute path
+ * for the file for writer's logging output. Can be empty.
+ *
+ * ---------------------------------------------------------------------------
+ */
+struct op_args {
+ uint32_t magic;
+ int help;
+ int main_port;
+ int verbosity;
+ int log_prepend_serv;
+ int log_prepend_type;
+ char log_path[PATH_MAX+1];
+ char writer_log_path[PATH_MAX+1];
+};
+
+
+/* ---------------------------------------------------------------------------
+ * Structure: struct server_run
+ *
+ * Purpose: Convenience structure for holding information about a server
+ * in operation.
+ *
+ * `magic` (uint32_t)
+ * Semi-unique number to help validate a pointer to this struct type.
+ * Must be SERVER_RUN_MAGIC to be considered valid.
+ *
+ * `log_stream` (FILE *)
+ * File handle where logging output is directed.
+ * By default, is stdout.
+ *
+ * `opts` (struct opt_args)
+ * Contained structure, holds the server's configuration.
+ *
+ * `listenfd` (int)
+ * File descriptor of the listening socket.
+ *
+ * ---------------------------------------------------------------------------
+ */
+struct server_run {
+ uint32_t magic;
+ struct op_args opts;
+ struct mirror_log_info *loginfo;
+ int listenfd;
+};
+
+
+/* ---------------------------------------------------------------------------
+ * Function: usage
+ *
+ * Purpose: Print the usage message to stdout.
+ * ---------------------------------------------------------------------------
+ */
+static void
+usage(void)
+{
+ HDfprintf(stdout,
+ "mirror_server [options]\n" \
+ "\n" \
+ "Application for providing Mirror Writer process to " \
+ " Mirror VFD on file-open.\n" \
+ "Listens on a dedicated socket; forks as a Writer upon receipt" \
+ " of a valid OPEN xmit.\n" \
+ "\n" \
+ "Options:\n" \
+ "--help [-h] : Print this help message and quit.\n" \
+ "--logpath=PATH : File path for logging output " \
+ "(default none, to stdout).\n" \
+ "--port=PORT : Primary port (default %d).\n" \
+ "--verbosity=NUM : Debug printing level " \
+ "0..4, (default %d).\n",
+ DEFAULT_PORT,
+ MIRROR_LOG_DEFAULT_VERBOSITY);
+} /* end usage() */
+
+
+/* ---------------------------------------------------------------------------
+ * Function: parse_args
+ *
+ * Purpose: Read command line options and store results in args_out
+ * structure. Fails in event of unrecognized option.
+ *
+ * Return: 0 on success, -1 on failure.
+ * ---------------------------------------------------------------------------
+ */
+static int
+parse_args(int argc, char **argv, struct op_args *args_out)
+{
+ int i;
+
+ /* preset default values
+ */
+ args_out->main_port = DEFAULT_PORT;
+ args_out->help = 0;
+ args_out->log_prepend_serv = 1;
+ args_out->log_prepend_type = 1;
+ args_out->verbosity = MIRROR_LOG_DEFAULT_VERBOSITY;
+ /* preset empty strings */
+ HDbzero(args_out->log_path, PATH_MAX+1);
+ HDbzero(args_out->writer_log_path, PATH_MAX+1);
+
+ if (argv == NULL || *argv == NULL) {
+ mirror_log(NULL, V_ERR, "invalid argv pointer");
+ return -1;
+ }
+
+ /* Loop over arguments after program name and writer_path */
+ for (i=2; i < argc; i++) {
+ if (!HDstrncmp(argv[i], "-h", 3) || !HDstrncmp(argv[i], "--help", 7)) {
+ mirror_log(NULL, V_INFO, "found help argument");
+ args_out->help = 1;
+ return 0;
+ } /* end if help */
+ else
+ if (!HDstrncmp(argv[i], "--port=", 7)) {
+ mirror_log(NULL, V_INFO, "parsing 'main_port' (%s)", argv[i]+7);
+ args_out->main_port = HDatoi(argv[i]+7);
+ } /* end if port */
+ else
+ if (!HDstrncmp(argv[i], "--verbosity=", 12)) {
+ mirror_log(NULL, V_INFO, "parsing 'verbosity' (%s)", argv[i]+12);
+ args_out->verbosity = HDatoi(argv[i]+12);
+ } /* end if verbosity */
+ else
+ if (!HDstrncmp(argv[i], "--logpath=", 10)) {
+ mirror_log(NULL, V_INFO, "parsing 'logpath' (%s)", argv[i]+10);
+ HDstrncpy(args_out->log_path, argv[i]+10, PATH_MAX);
+ } /* end if logpath */
+ else {
+ mirror_log(NULL, V_ERR, "unrecognized argument: %s", argv[i]);
+ return -1;
+ } /* end if unrecognized argument */
+ } /* end for each arg after the path to writer "receiver process" */
+
+ mirror_log(NULL, V_INFO, "all args parsed");
+
+ return 0;
+} /* end parse_args() */
+
+
+/* ---------------------------------------------------------------------------
+ * Function: prepare_listening_socket
+ *
+ * Purpose: Configure and open a socket.
+ * In event of error, attempts to undo its processes.
+ *
+ * Return: Success: non-negative (the file descriptor of the socket)
+ * Failure: -1
+ * ---------------------------------------------------------------------------
+ */
+static int
+prepare_listening_socket(struct server_run *run)
+{
+ struct sockaddr_in server_addr;
+ int _true = 1; /* needed for setsockopt() */
+ int ret_value = -1;
+ int ret = 0; /* for checking return value of function calls */
+
+ if (run == NULL || run->magic != SERVER_RUN_MAGIC) {
+ mirror_log(NULL, V_ERR, "invalid server_run pointer");
+ return -1;
+ }
+
+ mirror_log(run->loginfo, V_INFO, "preparing socket");
+
+ server_addr.sin_family = AF_INET;
+ server_addr.sin_addr.s_addr = HDhtonl(INADDR_ANY);
+ server_addr.sin_port = HDhtons((uint16_t)run->opts.main_port);
+
+ mirror_log(run->loginfo, V_INFO, "socket()");
+ ret_value = HDsocket(AF_INET, SOCK_STREAM, 0);
+ if (ret_value < 0) {
+ mirror_log(run->loginfo, V_ERR, "listening socket:%d", ret_value);
+ goto error;
+ }
+
+ mirror_log(run->loginfo, V_ALL, "setsockopt()");
+ HDsetsockopt(ret_value, SOL_SOCKET, SO_REUSEADDR, &_true, sizeof(int));
+
+ mirror_log(run->loginfo, V_INFO, "bind()");
+ ret = HDbind(ret_value, (struct sockaddr *)&server_addr,
+ sizeof(server_addr));
+ if (ret < 0) {
+ mirror_log(run->loginfo, V_ERR, "bind() %s", HDstrerror(errno));
+ goto error;
+ }
+
+ mirror_log(run->loginfo, V_INFO, "listen()");
+ ret = HDlisten(ret_value, LISTENQ);
+ if (ret < 0) {
+ mirror_log(run->loginfo, V_ERR, "H5FD server listen:%d", ret);
+ goto error;
+ }
+
+ return ret_value;
+
+error:
+ if (ret_value >= 0) {
+ HDshutdown(ret_value, SHUT_RDWR);
+ HDclose(ret_value);
+ }
+ return -1;
+} /* end prepare_listening_socket() */
+
+
+/* ---------------------------------------------------------------------------
+ * Function: init_server_run
+ *
+ * Purpose: Set up server_run struct with default and specified values.
+ *
+ * Return: Zero (0) if successful, -1 if an error occurred.
+ * ---------------------------------------------------------------------------
+ */
+static struct server_run *
+init_server_run(int argc, char **argv)
+{
+ struct server_run *run;
+
+ run = (struct server_run *)HDmalloc(sizeof(struct server_run));
+ if (run == NULL) {
+ mirror_log(NULL, V_ERR, "can't allocate server_run struct");
+ return NULL;
+ }
+
+ run->magic = (uint32_t)SERVER_RUN_MAGIC;
+ run->opts.magic = (uint32_t)OP_ARGS_MAGIC;
+ run->listenfd = -1;
+
+ if (parse_args(argc, argv, &(run->opts)) < 0) {
+ mirror_log(NULL, V_ERR, "can't parse arguments");
+ usage();
+ goto error;
+ }
+
+ if (run->opts.help) {
+ usage();
+ return run; /* early exit */
+ }
+
+ run->loginfo = mirror_log_init(run->opts.log_path, "s- ",
+ run->opts.verbosity);
+
+ run->listenfd = prepare_listening_socket(run);
+ if (run->listenfd < 0) {
+ mirror_log(NULL, V_ERR, "can't prepare listening socket");
+ goto error;
+ }
+
+ return run;
+
+error:
+ if (run != NULL) {
+ HDfree(run);
+ }
+ return NULL;
+
+} /* end init_server_run() */
+
+
+/* ---------------------------------------------------------------------------
+ * Function: term_server_run
+ *
+ * Purpose: Close opened items in a sever_run and release the pointer.
+ *
+ * Return: Zero (0) if successful, -1 if an error occurred.
+ * ---------------------------------------------------------------------------
+ */
+static int
+term_server_run(struct server_run *run)
+{
+ if (run == NULL || run->magic != SERVER_RUN_MAGIC) {
+ mirror_log(NULL, V_ERR, "invalid server_run pointer");
+ return -1;
+ }
+
+ mirror_log(run->loginfo, V_INFO, "shutting down");
+
+ if (run->listenfd >= 0) {
+ HDshutdown(run->listenfd, SHUT_RDWR); /* TODO: error-checking? */
+ HDclose(run->listenfd); /* TODO: error-checking? */
+ run->listenfd = -1;
+ }
+
+ if (mirror_log_term(run->loginfo) < 0) {
+ mirror_log(NULL, V_ERR, "can't close logging stream");
+ return -1; /* doesn't solve the problem, but informs of error */
+ }
+ run->loginfo = NULL;
+
+ (run->magic)++;
+ (run->opts.magic)++;
+ HDfree(run);
+ return 0;
+} /* end term_server_run() */
+
+
+/* ---------------------------------------------------------------------------
+ * Function: accept_connection
+ *
+ * Purpose: Main working loop; process requests as they are received.
+ * Does nothing if the run option help is set.
+ *
+ * Return: -1 on error, else a non-negative file descriptor of the socket.
+ * ---------------------------------------------------------------------------
+ */
+static int
+accept_connection(struct server_run *run)
+{
+ struct sockaddr_in client_addr; /**/
+ socklen_t clilen; /**/
+ struct hostent *host_port = NULL; /**/
+ char *hostaddrp; /**/
+ int connfd = -1; /* connection file descriptor */
+
+ if (run == NULL || run->magic != SERVER_RUN_MAGIC) {
+ mirror_log(NULL, V_ERR, "invalid server_run pointer");
+ return -1;
+ }
+
+ /*------------------------------*/
+ /* accept a connection on a socket */
+ clilen = sizeof(client_addr);
+ connfd = HDaccept(run->listenfd, (struct sockaddr *)&client_addr, &clilen);
+ if (connfd < 0) {
+ mirror_log(run->loginfo, V_ERR, "accept:%d", connfd);
+ goto error;
+ }
+ mirror_log(run->loginfo, V_INFO, "connection achieved");
+
+ /*------------------------------*/
+ /* get client address information */
+ host_port = HDgethostbyaddr(
+ (const char *)&client_addr.sin_addr.s_addr,
+ sizeof(client_addr.sin_addr.s_addr),
+ AF_INET);
+ if (host_port == NULL) {
+ mirror_log(run->loginfo, V_ERR, "gethostbyaddr()");
+ goto error;
+ }
+
+ /* function has the string space statically scoped -- OK until next call */
+ hostaddrp = HDinet_ntoa(client_addr.sin_addr);
+ /* TODO? proper error-checking */
+
+ mirror_log(run->loginfo, V_INFO,
+ "server connected with %s (%s)",
+ host_port->h_name,
+ hostaddrp);
+
+ return connfd;
+
+error:
+ if (connfd >= 0) {
+ close(connfd);
+ }
+ return -1;
+} /* end accept_connection() */
+
+
+/* ---------------------------------------------------------------------------
+ * Function: wait_for_child
+ *
+ * Purpose: Signal handler to reap zombie processes.
+ * ---------------------------------------------------------------------------
+ */
+static void
+wait_for_child(int sig)
+{
+ while (HDwaitpid(-1, NULL, WNOHANG) > 0);
+} /* end wait_for_child() */
+
+
+/* ---------------------------------------------------------------------------
+ * Function: handle_requests
+ *
+ * Purpose: Main working loop; process requests as they are received.
+ * Does nothing if the run option `help` is set.
+ *
+ * Return: -1 on error, else 0 for successful operation.
+ * ---------------------------------------------------------------------------
+ */
+static int
+handle_requests(struct server_run *run)
+{
+ int connfd = -1; /**/
+ char mybuf[H5FD_MIRROR_XMIT_OPEN_SIZE]; /**/
+ int ret; /* general-purpose error-checking */
+ int pid; /* process ID of fork */
+ struct sigaction sa;
+ int ret_value = 0;
+
+ if (run == NULL || run->magic != SERVER_RUN_MAGIC) {
+ mirror_log(NULL, V_ERR, "invalid server_run pointer");
+ return -1;
+ }
+
+ if (run->opts.help) {
+ return 0;
+ }
+
+ if (run->listenfd < 0) {
+ mirror_log(NULL, V_ERR, "invalid listening socket");
+ return -1;
+ }
+
+ /* Set up the signal handler */
+ sa.sa_handler = wait_for_child;
+ HDsigemptyset(&sa.sa_mask);
+ sa.sa_flags = SA_RESTART;
+ if (HDsigaction(SIGCHLD, &sa, NULL) == -1) {
+ perror("sigaction");
+ return 1;
+ }
+
+ /* Keep listening for attempts to connect.
+ */
+
+ while (1) { /* infinite loop, exited via break or goto */
+ mirror_log(run->loginfo, V_INFO, "server waiting for connections...");
+
+ connfd = -1;
+
+ connfd = accept_connection(run);
+ if (connfd < 0) {
+ mirror_log(run->loginfo, V_ERR, "unable to receive connection");
+ goto error;
+ }
+
+ /* Read handshake from port connection.
+ */
+
+ ret = (int)HDread(connfd, &mybuf, H5FD_MIRROR_XMIT_OPEN_SIZE);
+ if (-1 == ret) {
+ mirror_log(run->loginfo, V_ERR, "read:%d", ret);
+ goto error;
+ }
+ mirror_log(run->loginfo, V_INFO, "received %d bytes", ret);
+ mirror_log(run->loginfo, V_ALL, "```");
+ mirror_log_bytes(run->loginfo, V_ALL, ret,
+ (const unsigned char *)mybuf);
+ mirror_log(run->loginfo, V_ALL, "```");
+
+ /* Respond to handshake message.
+ */
+
+ if (!HDstrncmp("SHUTDOWN", mybuf, 8)) {
+ /* Stop operation if told to stop */
+ mirror_log(run->loginfo, V_INFO, "received SHUTDOWN!", ret);
+ HDclose(connfd);
+ connfd = -1;
+ goto done;
+ } /* end if explicit "SHUTDOWN" directive */
+ else
+ if (H5FD_MIRROR_XMIT_OPEN_SIZE == ret) {
+ H5FD_mirror_xmit_open_t xopen;
+
+ mirror_log(run->loginfo, V_INFO,
+ "probable OPEN xmit received");
+
+ H5FD_mirror_xmit_decode_open(&xopen, (const unsigned char *)mybuf);
+ if (FALSE == H5FD_mirror_xmit_is_open(&xopen)) {
+ mirror_log(run->loginfo, V_WARN,
+ "expected OPEN xmit was malformed");
+ HDclose(connfd);
+ continue;
+ }
+
+ mirror_log(run->loginfo, V_INFO,
+ "probable OPEN xmit confirmed");
+
+ pid = HDfork();
+ if (pid < 0) { /* fork error */
+ mirror_log(run->loginfo, V_ERR, "cannot fork");
+ goto error;
+ } /* end if fork error */
+ else
+ if (pid == 0) { /* child process (writer side of fork) */
+ mirror_log(run->loginfo, V_INFO,
+ "executing writer");
+ if (run_writer(connfd, &xopen) < 0) {
+ HDprintf("can't run writer\n");
+ }
+ else {
+ HDprintf("writer OK\n");
+ }
+ HDclose(connfd);
+
+ HDexit(EXIT_SUCCESS);
+ } /* end if writer side of fork */
+ else { /* parent process (server side of fork) */
+ mirror_log(run->loginfo, V_INFO, "tidying up from handshake");
+ HDclose(connfd);
+ } /* end if server side of fork */
+
+ } /* end else-if valid request for service */
+ else {
+ /* Ignore unrecognized messages */
+ HDclose(connfd);
+ continue;
+ } /* end else (not a valid message, to be ignored) */
+
+ } /* end while listening for new connections */
+
+done:
+ if (connfd >= 0) {
+ mirror_log(run->loginfo, V_WARN, "connfd still open upon cleanup");
+ HDclose(connfd);
+ }
+
+ return ret_value;
+
+error:
+ if (connfd >= 0) {
+ HDclose(connfd);
+ }
+ return -1;
+} /* end handle_requests() */
+
+
+/* ------------------------------------------------------------------------- */
+int
+main(int argc, char **argv)
+{
+ struct server_run *run;
+
+ run = init_server_run(argc, argv);
+ if (NULL == run) {
+ mirror_log(NULL, V_ERR, "can't initialize run");
+ HDexit(EXIT_FAILURE);
+ }
+
+ if (handle_requests(run) < 0) {
+ mirror_log(run->loginfo, V_ERR, "problem handling requests");
+ }
+
+ if (term_server_run(run) < 0) {
+ mirror_log(NULL, V_ERR, "problem closing server run");
+ HDexit(EXIT_FAILURE);
+ }
+
+ HDexit(EXIT_SUCCESS);
+} /* end main() */
+
+#else /* H5_HAVE_MIRROR_VFD */
+
+int
+main(int argc, char **argv)
+{
+ HDprintf("Mirror VFD was not built -- cannot launch server.\n");
+ HDexit(EXIT_FAILURE);
+}
+
+#endif /* H5_HAVE_MIRROR_VFD */
+
diff --git a/utils/mirror_vfd/mirror_server_halten_sie.c b/utils/mirror_vfd/mirror_server_halten_sie.c
new file mode 100644
index 0000000..ccd5824
--- /dev/null
+++ b/utils/mirror_vfd/mirror_server_halten_sie.c
@@ -0,0 +1,214 @@
+/* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *
+ * Copyright by The HDF Group. *
+ * All rights reserved. *
+ * *
+ * This file is part of HDF5. The full HDF5 copyright notice, including *
+ * terms governing use, modification, and redistribution, is contained in *
+ * the COPYING file, which can be found at the root of the source code *
+ * distribution tree, or in https://support.hdfgroup.org/ftp/HDF5/releases. *
+ * If you do not have access to either file, you may request a copy from *
+ * help@hdfgroup.org. *
+ * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
+
+/*
+ * Purpose: Stop the mirror server
+ * Exists for cross-platform, optionally remote shutdown.
+ */
+
+#include "H5private.h" /* System compatability call-wrapper macros */
+
+#ifdef H5_HAVE_MIRROR_VFD
+
+#define MSHS_OPTS_MAGIC 0x613B1C15u /* sanity-checking constant */
+#define MSHS_IP_STR_SIZE 20
+#define MSHS_DEFAULT_IP "127.0.0.1"
+#define MSHS_DEFAULT_PORTNO 3000
+
+
+/* ----------------------------------------------------------------------------
+ * Structure: struct mshs_opts
+ *
+ * Purpose: Convenience structure to hold options as parsed from the
+ * command line.
+ *
+ * `magic` (uint32_t)
+ * Semi-unique constant to help verify pointer integrity.
+ *
+ * `help` (int)
+ * Flag that the help argument was present.
+ *
+ * `portno` (int)
+ * Port number, as received from arguments.
+ *
+ * `ip` (char *)
+ * IP address string as received from arguments.
+ *
+ * ----------------------------------------------------------------------------
+ */
+struct mshs_opts {
+ uint32_t magic;
+ int help;
+ int portno;
+ char ip[MSHS_IP_STR_SIZE + 1];
+};
+
+
+/* ----------------------------------------------------------------------------
+ * Function: usage
+ *
+ * Purpose: Print usage message to stdout.
+ * ----------------------------------------------------------------------------
+ */
+static void
+usage(void)
+{
+ HDprintf("mirror_server_halten_sie [options]\n" \
+ "System-independent Mirror Server shutdown program.\n" \
+ "Sends shutdown message to Mirror Server at given IP:port\n" \
+ "\n" \
+ "Options:\n" \
+ " -h | --help Print this usage message and exit.\n" \
+ " --ip=ADDR IP Address of remote server (defaut %s)\n" \
+ " --port=PORT Handshake port of remote server (default %d)\n",
+ MSHS_DEFAULT_IP,
+ MSHS_DEFAULT_PORTNO);
+} /* end usage() */
+
+
+/* ----------------------------------------------------------------------------
+ * Function: parse_args
+ *
+ * Purpose: Parse command-line arguments, populating the options struct
+ * pointer as appropriate.
+ * Default values will be set for unspecified options.
+ *
+ * Return: 0 on success, negative (-1) if error.
+ * ----------------------------------------------------------------------------
+ */
+static int
+parse_args(int argc, char **argv, struct mshs_opts *opts)
+{
+ int i = 0;
+
+ opts->magic = MSHS_OPTS_MAGIC;
+ opts->help = 0;
+ opts->portno = MSHS_DEFAULT_PORTNO;
+ HDstrncpy(opts->ip, MSHS_DEFAULT_IP, MSHS_IP_STR_SIZE);
+
+ for (i=1; i < argc; i++) { /* start with first possible option argument */
+ if (!HDstrncmp(argv[i], "-h", 3) || !HDstrncmp(argv[i], "--help", 7)) {
+ opts->help = 1;
+ }
+ else
+ if (!HDstrncmp(argv[i], "--ip=", 5)) {
+ HDstrncpy(opts->ip, argv[i]+5, MSHS_IP_STR_SIZE);
+ }
+ else
+ if (!HDstrncmp(argv[i], "--port=", 7)) {
+ opts->portno = HDatoi(argv[i]+7);
+ }
+ else {
+ HDprintf("Unrecognized option: '%s'\n", argv[i]);
+ usage();
+ opts->magic++; /* invalidate for sanity */
+ return -1;
+ }
+ } /* end for each argument from command line */
+
+ /* auto-replace 'localhost' with numeric IP */
+ if (!HDstrncmp(opts->ip, "localhost", 10)) { /* include null terminator */
+ HDstrncpy(opts->ip, "127.0.0.1", MSHS_IP_STR_SIZE);
+ }
+
+ return 0;
+} /* end parse_args() */
+
+
+/* ----------------------------------------------------------------------------
+ * Function: send_shutdown
+ *
+ * Purpose: Create socket and send shutdown signal to remote server.
+ *
+ * Return: 0 on success, negative (-1) if error.
+ * ----------------------------------------------------------------------------
+ */
+static int
+send_shutdown(struct mshs_opts *opts)
+{
+ int live_socket;
+ struct sockaddr_in target_addr;
+
+ if (opts->magic != MSHS_OPTS_MAGIC) {
+ HDprintf("invalid options structure\n");
+ return -1;
+ }
+
+ live_socket = HDsocket(AF_INET, SOCK_STREAM, 0);
+ if (live_socket < 0) {
+ HDprintf("ERROR socket()\n");
+ return -1;
+ }
+
+ target_addr.sin_family = AF_INET;
+ target_addr.sin_port = HDhtons((uint16_t)opts->portno);
+ target_addr.sin_addr.s_addr = HDinet_addr(opts->ip);
+ HDmemset(target_addr.sin_zero, '\0', sizeof(target_addr.sin_zero));
+
+ if (HDconnect(live_socket, (struct sockaddr *)&target_addr,
+ (socklen_t)sizeof(target_addr))
+ < 0)
+ {
+ HDprintf("ERROR connect() (%d)\n%s\n", errno, HDstrerror(errno));
+ return -1;
+ }
+
+ if (HDwrite(live_socket, "SHUTDOWN", 9) == -1) {
+ HDprintf("ERROR write() (%d)\n%s\n", errno, HDstrerror(errno));
+ return -1;
+ }
+
+ if (HDclose(live_socket) < 0) {
+ HDprintf("ERROR close() can't close socket\n");
+ return -1;
+ }
+
+ return 0;
+} /* end send_shutdown() */
+
+
+/* ------------------------------------------------------------------------- */
+int
+main(int argc, char **argv)
+{
+ struct mshs_opts opts;
+
+ if (parse_args(argc, argv, &opts) < 0) {
+ HDprintf("Unable to parse arguments\n");
+ return EXIT_FAILURE;
+ }
+
+ if (opts.help) {
+ usage();
+ return EXIT_SUCCESS;
+ }
+
+ if (send_shutdown(&opts) < 0) {
+ HDprintf("Unable to send shutdown command\n");
+ return EXIT_FAILURE;
+ }
+
+ return EXIT_SUCCESS;
+} /* end main() */
+
+#else /* H5_HAVE_MIRROR_VFD */
+
+
+/* ------------------------------------------------------------------------- */
+int
+main(int argc, char **argv)
+{
+ HDprintf("Mirror VFD not built -- unable to perform shutdown.\n");
+ return EXIT_FAILURE;
+}
+
+#endif /* H5_HAVE_MIRROR_VFD */
diff --git a/utils/mirror_vfd/mirror_writer.c b/utils/mirror_vfd/mirror_writer.c
new file mode 100644
index 0000000..9cd9b4c
--- /dev/null
+++ b/utils/mirror_vfd/mirror_writer.c
@@ -0,0 +1,1064 @@
+/* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *
+ * Copyright by The HDF Group. *
+ * All rights reserved. *
+ * *
+ * This file is part of HDF5. The full HDF5 copyright notice, including *
+ * terms governing use, modification, and redistribution, is contained in *
+ * the COPYING file, which can be found at the root of the source code *
+ * distribution tree, or in https://support.hdfgroup.org/ftp/HDF5/releases. *
+ * If you do not have access to either file, you may request a copy from *
+ * help@hdfgroup.org. *
+ * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
+
+/*
+ * Remote writer process for the mirror (socket) VFD.
+ *
+ * Writer is started with arguments for slaved port.
+ * Awaits a connection on the socket.
+ * Handles instructions from the master 'Driver' process.
+ *
+ * Current implementation uses Sec2 as the underlying driver when opening a
+ * file. This is reflected in the source (H5FDmirror.c) of the Mirror driver.
+ */
+
+#include "mirror_remote.h"
+
+#ifdef H5_HAVE_MIRROR_VFD
+
+#define HEXDUMP_XMITS 1 /* Toggle whether to print xmit bytes-blob */
+ /* in detailed logging */
+#define HEXDUMP_WRITEDATA 0 /* Toggle whether to print bytes to write */
+ /* in detailed logging */
+#define LISTENQ 80 /* max pending Driver requests */
+
+#define MW_SESSION_MAGIC 0x88F36B32u
+#define MW_SOCK_COMM_MAGIC 0xDF10A157u
+#define MW_OPTS_MAGIC 0x3BA8B462u
+
+
+/* ---------------------------------------------------------------------------
+ * Structure: struct mirror_session
+ *
+ * Bundle of information used to manage the operation of this remote Writer
+ * in a "session" with the Driver process.
+ *
+ * magic (uint32_t)
+ * Semi-unique "magic" number used to sanity-check a structure for
+ * validity. MUST equal MW_SESSION_MAGIC to be valid.
+ *
+ * sockfd (int)
+ * File descriptor to the socket.
+ * Used for receiving bytes from and writing bytes to the Driver
+ * across the network.
+ * If not NULL, should be a valid descriptor.
+ *
+ * token (uint32t)
+ * Number used to help sanity-check received transmission from the Writer.
+ * Each Driver/Writer pairing should have a semi-unique "token" to help
+ * guard against commands from the wrong entity.
+ *
+ * xmit_count (uint32_t)
+ * Record of trasmissions received from the Driver. While the transmission
+ * protocol should be trustworthy, this serves as an additional guard.
+ * Starts a 0 and should be incremented for each one-way transmission.
+ *
+ * file (H5FD_t *)
+ * Virtual File handle for the hdf5 file.
+ * Set on file open if H5Fopen() is successful. If NULL, it is invalid.
+ *
+ * log_verbosity (unsigned int)
+ * The verbosity level for logging. Should be set to one of the values
+ * defined at the top of this file.
+ *
+ * log_stream (FILE *)
+ * File pointer to which logging output is written. Starts (and ends)
+ * with a default stream, such as stdout, but can be overridden at
+ * runtime.
+ *
+ * reply (H5FD_mirror_xmit_reply_t)
+ * Structure space for persistent reply data.
+ * Should be initialized with basic header info (magic, version, op),
+ * then with session info (token, xmit count), and finally with specific
+ * reply info (update xmit_count, status code, and message) before
+ * transmission.
+ *
+ * ----------------------------------------------------------------------------
+ */
+struct mirror_session {
+ uint32_t magic;
+ int sockfd;
+ uint32_t token;
+ uint32_t xmit_count;
+ H5FD_t *file;
+ loginfo_t *loginfo;
+ H5FD_mirror_xmit_reply_t reply;
+};
+
+
+/* ---------------------------------------------------------------------------
+ * Structure: struct sock_comm
+ *
+ * Structure for placing the data read and pre-processed from Driver in an
+ * organized fashion. Useful for pre-processing a received xmit.
+ *
+ * magic (uint32_t)
+ * Semi-unique number to sanity-check structure pointer and validity.
+ * Must equal MW_SOCK_COMM_MAGIC to be valid.
+ *
+ * recd_die (int)
+ * "Boolean" flag indicating that an explicit shutdown/kill/die command
+ * was received. Potentially useful for debugging and or "manual"
+ * operation of the program.
+ * 0 indicates normal operation, non-0 (1) indicates to die.
+ *
+ * xmit_recd (H5FD_mirror_xmit_t *)
+ * Structure pointer for the "xmit header" as decoded from the raw
+ * binary stream read from the socket.
+ *
+ * raw (char *)
+ * Pointer to a raw byte array, for storing data as read from the
+ * socket. Bytes buffer is decoded into xmit_t header and derivative
+ * structures.
+ *
+ * raw_size (size_t)
+ * Give the size of the `raw` buffer.
+ *
+ * ---------------------------------------------------------------------------
+ */
+struct sock_comm {
+ uint32_t magic;
+ int recd_die;
+ H5FD_mirror_xmit_t *xmit_recd;
+ char *raw;
+ size_t raw_size;
+};
+
+
+/* ---------------------------------------------------------------------------
+ * Structure: struct mirror_writer_opts
+ *
+ * Container for default values and options as parsed from the command line.
+ * Currently rather vestigal, but may be expanded and/or moved to be set by
+ * Server and passed around as an argument.
+ *
+ * magic (uint32_t)
+ * Semi-unique number to sanity-check structure pointer and validity.
+ * Must equal MW_OPTS_MAGIC to be valid.
+ *
+ * logpath (char *)
+ * String pointer. Allocated at runtime.
+ * Specifies file location for logging output.
+ * May be NULL -- uses default output (e.g., stdout).
+ *
+ * ----------------------------------------------------------------------------
+ */
+struct mirror_writer_opts {
+ uint32_t magic;
+ char *logpath;
+};
+
+static int do_open(struct mirror_session *session,
+ const H5FD_mirror_xmit_open_t *xmit_open);
+
+
+/* ---------------------------------------------------------------------------
+ * Function: session_init
+ *
+ * Purpose: Populate mirror_session structure with default and
+ * options-drived values.
+ *
+ * Return: An allocated mirror_session structure pointer on success,
+ * else NULL.
+ * ----------------------------------------------------------------------------
+ */
+static struct mirror_session *
+session_init(struct mirror_writer_opts *opts)
+{
+ struct mirror_session *session = NULL;
+
+ mirror_log(NULL, V_INFO, "session_init()");
+
+ if (NULL == opts || opts->magic != MW_OPTS_MAGIC) {
+ mirror_log(NULL, V_ERR, "invalid opts pointer");
+ goto error;
+ }
+
+ session = (struct mirror_session *)HDmalloc(sizeof(struct mirror_session));
+ if (session == NULL) {
+ mirror_log(NULL, V_ERR, "can't allocate session structure");
+ goto error;
+ }
+
+ session->magic = MW_SESSION_MAGIC;
+ session->sockfd = -1;
+ session->xmit_count = 0;
+ session->token = 0;
+ session->file = NULL;
+
+ session->reply.pub.magic = H5FD_MIRROR_XMIT_MAGIC;
+ session->reply.pub.version = H5FD_MIRROR_XMIT_CURR_VERSION;
+ session->reply.pub.op = H5FD_MIRROR_OP_REPLY;
+ session->reply.pub.session_token = 0;
+ HDbzero(session->reply.message, H5FD_MIRROR_STATUS_MESSAGE_MAX);
+
+ /* Options-derived population
+ */
+
+ session->loginfo = mirror_log_init(opts->logpath, "W- ",
+ MIRROR_LOG_DEFAULT_VERBOSITY);
+
+ return session;
+
+error:
+ if (session) {
+ HDfree(session);
+ }
+ return NULL;
+} /* end session_init() */
+
+
+/* ---------------------------------------------------------------------------
+ * Function: session_stop
+ *
+ * Purpose: Stop and clean up a session.
+ * Only do this as part of program termination or aborting startup.
+ *
+ * Return: 0 on success, or negative sum of errors encountered.
+ * ----------------------------------------------------------------------------
+ */
+static int
+session_stop(struct mirror_session *session)
+{
+ int ret_value = 0;
+
+ HDassert(session && (session->magic == MW_SESSION_MAGIC));
+
+ mirror_log(session->loginfo, V_INFO, "session_stop()");
+
+ /* Close HDF5 file if it is still open (probably in error) */
+ if (session->file) {
+ mirror_log(session->loginfo, V_WARN, "HDF5 file still open at cleanup");
+ if (H5FDclose(session->file) < 0) {
+ mirror_log(session->loginfo, V_ERR, "H5FDclose() during cleanup!");
+ ret_value--;
+ }
+ }
+
+ /* Socket will be closed by parent side of server fork after exit */
+
+ /* Close custom logging stream */
+ if (mirror_log_term(session->loginfo) < 0) {
+ mirror_log(NULL, V_ERR, "Problem closing logging stream");
+ ret_value--;
+ }
+ session->loginfo = NULL;
+
+ /* Invalidate and release structure */
+ session->magic++;
+ HDfree(session);
+
+ return ret_value;
+} /* end session_stop() */
+
+
+/* ---------------------------------------------------------------------------
+ * Function: session_start
+ *
+ * Purpose: Initiate session, open files.
+ *
+ * Return: Success: A valid mirror_session pointer which must later be
+ * cleaned up with session_stop().
+ * Failure: NULL, after cleaning up after itself.
+ * ---------------------------------------------------------------------------
+ */
+static struct mirror_session *
+session_start(int socketfd, const H5FD_mirror_xmit_open_t *xmit_open)
+{
+ struct mirror_session *session = NULL;
+ struct mirror_writer_opts opts;
+#if 0 /* TODO: behaviro option */
+ char logpath[H5FD_MIRROR_XMIT_FILEPATH_MAX] = "";
+#endif
+
+ mirror_log(NULL, V_INFO, "session_start()");
+
+ if (FALSE == H5FD_mirror_xmit_is_open(xmit_open)) {
+ mirror_log(NULL, V_ERR, "invalid OPEN xmit");
+ return NULL;
+ }
+
+ opts.magic = MW_OPTS_MAGIC;
+#if 0 /* TODO: behavior option */
+ HDsnprintf(logpath, H5FD_MIRROR_XMIT_FILEPATH_MAX, "%s.log",
+ xmit_open->filename);
+ opts.logpath = logpath;
+#else
+ opts.logpath = NULL;
+#endif
+
+ session = session_init(&opts);
+ if (NULL == session) {
+ mirror_log(NULL, V_ERR, "can't instantiate session");
+ goto error;
+ }
+
+ session->sockfd = socketfd;
+
+ if (do_open(session, xmit_open) < 0) {
+ mirror_log(NULL, V_ERR, "unable to open file");
+ goto error;
+ }
+
+ return session;
+
+error:
+ if (session != NULL) {
+ if (session_stop(session) < 0) {
+ mirror_log(NULL, V_WARN, "Can't abort session init");
+ }
+ session = NULL;
+ }
+ return NULL;
+}
+
+
+/* ---------------------------------------------------------------------------
+ * Function: _xmit_reply
+ *
+ * Purpose: Common operations to send a reply xmit through the session.
+ *
+ * Return: 0 on success, -1 if error.
+ * ----------------------------------------------------------------------------
+ */
+static int
+_xmit_reply(struct mirror_session *session)
+{
+ unsigned char xmit_buf[H5FD_MIRROR_XMIT_REPLY_SIZE];
+ H5FD_mirror_xmit_reply_t *reply = &(session->reply);
+
+ HDassert(session && (session->magic == MW_SESSION_MAGIC));
+
+ mirror_log(session->loginfo, V_ALL, "_xmit_reply()");
+
+ reply->pub.xmit_count = session->xmit_count++;
+ if (H5FD_mirror_xmit_encode_reply(xmit_buf,
+ (const H5FD_mirror_xmit_reply_t *)reply)
+ != H5FD_MIRROR_XMIT_REPLY_SIZE)
+ {
+ mirror_log(session->loginfo, V_ERR, "can't encode reply");
+ return -1;
+ }
+
+ mirror_log(session->loginfo, V_ALL, "reply xmit data\n```");
+ mirror_log_bytes(session->loginfo, V_ALL, H5FD_MIRROR_XMIT_REPLY_SIZE,
+ (const unsigned char *)xmit_buf);
+ mirror_log(session->loginfo, V_ALL, "```");
+
+ if (HDwrite(session->sockfd, xmit_buf, H5FD_MIRROR_XMIT_REPLY_SIZE) < 0) {
+ mirror_log(session->loginfo, V_ERR, "can't write reply to Driver");
+ return -1;
+ }
+
+ return 0;
+} /* end _xmit_reply() */
+
+
+/* ---------------------------------------------------------------------------
+ * Function: reply_ok
+ *
+ * Purpose: Send an OK reply through the session.
+ *
+ * Return: 0 on success, -1 if error.
+ * ---------------------------------------------------------------------------
+ */
+static int
+reply_ok(struct mirror_session *session)
+{
+ H5FD_mirror_xmit_reply_t *reply = &(session->reply);
+
+ HDassert(session && (session->magic == MW_SESSION_MAGIC));
+
+ mirror_log(session->loginfo, V_ALL, "reply_ok()");
+
+ reply->status = H5FD_MIRROR_STATUS_OK;
+ HDbzero(reply->message, H5FD_MIRROR_STATUS_MESSAGE_MAX);
+ return _xmit_reply(session);
+} /* end reply_ok() */
+
+
+/* ---------------------------------------------------------------------------
+ * Function: reply_error
+ *
+ * Purpose: Send an ERROR reply with message through the session.
+ * Message may be cut short if it would overflow the available
+ * buffer in the xmit.
+ *
+ * Return: 0 on success, -1 if error.
+ * ---------------------------------------------------------------------------
+ */
+static int
+reply_error(struct mirror_session *session,
+ const char *msg)
+{
+ H5FD_mirror_xmit_reply_t *reply = &(session->reply);
+
+ HDassert(session && (session->magic == MW_SESSION_MAGIC));
+
+ mirror_log(session->loginfo, V_ALL, "reply_error(%s)", msg);
+
+ reply->status = H5FD_MIRROR_STATUS_ERROR;
+ HDsnprintf(reply->message, H5FD_MIRROR_STATUS_MESSAGE_MAX-1, "%s", msg);
+ return _xmit_reply(session);
+} /* end reply_error() */
+
+
+/* ---------------------------------------------------------------------------
+ * Function: do_close
+ *
+ * Purpose: Handle an CLOSE operation.
+ *
+ * Return: 0 on success, -1 if error.
+ * ---------------------------------------------------------------------------
+ */
+static int
+do_close(struct mirror_session *session)
+{
+
+ HDassert(session && (session->magic == MW_SESSION_MAGIC));
+
+ mirror_log(session->loginfo, V_INFO, "do_close()");
+
+ if (NULL == session->file) {
+ mirror_log(session->loginfo, V_ERR, "no file to close!");
+ reply_error(session, "no file to close");
+ return -1;
+ }
+
+ if (H5FDclose(session->file) < 0) {
+ mirror_log(session->loginfo, V_ERR, "H5FDclose()");
+ reply_error(session, "H5FDclose()");
+ return -1;
+ }
+ session->file = NULL;
+
+ if (reply_ok(session) < 0) {
+ mirror_log(session->loginfo, V_ERR, "can't reply");
+ reply_error(session, "ok reply failed; session contaminated");
+ return -1;
+ }
+
+ return 0;
+} /* end do_close() */
+
+
+/* ---------------------------------------------------------------------------
+ * Function: do_lock
+ *
+ * Purpose: Handle a LOCK operation.
+ *
+ * Return: 0 on success, -1 if error.
+ * ---------------------------------------------------------------------------
+ */
+static int
+do_lock(struct mirror_session *session,
+ const unsigned char *xmit_buf)
+{
+ size_t decode_ret = 0;
+ H5FD_mirror_xmit_lock_t xmit_lock;
+
+ HDassert(session && (session->magic == MW_SESSION_MAGIC) && xmit_buf);
+
+ mirror_log(session->loginfo, V_INFO, "do_lock()");
+
+ decode_ret = H5FD_mirror_xmit_decode_lock(&xmit_lock, xmit_buf);
+ if (H5FD_MIRROR_XMIT_LOCK_SIZE != decode_ret) {
+ mirror_log(session->loginfo, V_ERR, "can't decode set-eoa xmit");
+ reply_error(session, "remote xmit_eoa_t decoding size failure");
+ return -1;
+ }
+
+ if (!H5FD_mirror_xmit_is_lock(&xmit_lock)) {
+ mirror_log(session->loginfo, V_ERR, "not a set-eoa xmit");
+ reply_error(session, "remote xmit_eoa_t decode failure");
+ return -1;
+ }
+ mirror_log(session->loginfo, V_INFO, "lock rw: (%d)", xmit_lock.rw);
+
+ if (H5FDlock(session->file, (hbool_t)xmit_lock.rw) < 0) {
+ mirror_log(session->loginfo, V_ERR, "H5FDlock()");
+ reply_error(session, "remote H5FDlock() failure");
+ return -1;
+ }
+
+ if (reply_ok(session) < 0) {
+ mirror_log(session->loginfo, V_ERR, "can't reply");
+ reply_error(session, "ok reply failed; session contaminated");
+ return -1;
+ }
+
+ return 0;
+} /* end do_lock() */
+
+
+/* ---------------------------------------------------------------------------
+ * Function: do_open
+ *
+ * Purpose: Handle an OPEN operation.
+ *
+ * Return: 0 on success, -1 if error.
+ * ---------------------------------------------------------------------------
+ */
+static int
+do_open(struct mirror_session *session,
+ const H5FD_mirror_xmit_open_t *xmit_open)
+{
+ hid_t fapl_id = H5I_INVALID_HID;
+ unsigned _flags = 0;
+ haddr_t _maxaddr = HADDR_UNDEF;
+
+ HDassert(session && (session->magic == MW_SESSION_MAGIC) && xmit_open &&
+ TRUE == H5FD_mirror_xmit_is_open(xmit_open));
+
+ mirror_log(session->loginfo, V_INFO, "do_open()");
+
+ if (0 != xmit_open->pub.xmit_count) {
+ mirror_log(session->loginfo, V_ERR, "open with xmit count not zero!");
+ reply_error(session, "initial transmission count not zero");
+ goto error;
+ }
+ if (0 != session->token) {
+ mirror_log(session->loginfo, V_ERR, "open with token already set!");
+ reply_error(session, "initial session token not zero");
+ goto error;
+ }
+
+ session->xmit_count = 1;
+ session->token = xmit_open->pub.session_token;
+ session->reply.pub.session_token = session->token;
+
+ _flags = (unsigned)xmit_open->flags;
+ _maxaddr = (haddr_t)xmit_open->maxaddr;
+
+ /* Check whether the native size_t on the remote machine (Driver) is larger
+ * than that on the local machine; if so, issue a warning.
+ * The blob is always an 8-byte bitfield -- check its contents.
+ */
+ if (xmit_open->size_t_blob > (uint64_t)((size_t)(-1))) {
+ mirror_log(session->loginfo, V_WARN,
+ "Driver size_t is larger than our own");
+ }
+
+ mirror_log(session->loginfo, V_INFO,
+ "to open file %s (flags %d) (maxaddr %d)",
+ xmit_open->filename, _flags, _maxaddr);
+
+ /* Explicitly use Sec2 as the underlying driver for now.
+ */
+ fapl_id = H5Pcreate(H5P_FILE_ACCESS);
+ if (fapl_id < 0) {
+ mirror_log(session->loginfo, V_ERR, "can't create FAPL");
+ reply_error(session, "H5Pcreate() failure");
+ goto error;
+ }
+ if (H5Pset_fapl_sec2(fapl_id) < 0) {
+ mirror_log(session->loginfo, V_ERR, "can't set FAPL as Sec2");
+ reply_error(session, "H5Pset_fapl_sec2() failure");
+ goto error;
+ }
+
+ session->file = H5FDopen(xmit_open->filename, _flags, fapl_id, _maxaddr);
+ if (NULL == session->file) {
+ mirror_log(session->loginfo, V_ERR, "H5FDopen()");
+ reply_error(session, "remote H5FDopen() failure");
+ goto error;
+ }
+
+ /* FAPL is set and in use; clean up */
+ if (H5Pclose(fapl_id) < 0) {
+ mirror_log(session->loginfo, V_ERR, "can't set FAPL as Sec2");
+ reply_error(session, "H5Pset_fapl_sec2() failure");
+ goto error;
+ }
+
+ if (reply_ok(session) < 0) {
+ mirror_log(session->loginfo, V_ERR, "can't reply");
+ reply_error(session, "ok reply failed; session contaminated");
+ return -1;
+ }
+
+ return 0;
+
+error:
+ if (fapl_id > 0) {
+ H5E_BEGIN_TRY {
+ (void)H5Pclose(fapl_id);
+ } H5E_END_TRY;
+ }
+ return -1;
+} /* end do_open() */
+
+
+/* ---------------------------------------------------------------------------
+ * Function: do_set_eoa
+ *
+ * Purpose: Handle a SET_EOA operation.
+ *
+ * Return: 0 on success, -1 if error.
+ * ---------------------------------------------------------------------------
+ */
+static int
+do_set_eoa(struct mirror_session *session,
+ const unsigned char *xmit_buf)
+{
+ size_t decode_ret = 0;
+ H5FD_mirror_xmit_eoa_t xmit_seoa;
+
+ HDassert(session && (session->magic == MW_SESSION_MAGIC) && xmit_buf);
+
+ mirror_log(session->loginfo, V_INFO, "do_set_eoa()");
+
+ decode_ret = H5FD_mirror_xmit_decode_set_eoa(&xmit_seoa, xmit_buf);
+ if (H5FD_MIRROR_XMIT_EOA_SIZE != decode_ret) {
+ mirror_log(session->loginfo, V_ERR, "can't decode set-eoa xmit");
+ reply_error(session, "remote xmit_eoa_t decoding size failure");
+ return -1;
+ }
+
+ if (!H5FD_mirror_xmit_is_set_eoa(&xmit_seoa)) {
+ mirror_log(session->loginfo, V_ERR, "not a set-eoa xmit");
+ reply_error(session, "remote xmit_eoa_t decode failure");
+ return -1;
+ }
+
+ mirror_log(session->loginfo, V_INFO, "set EOA addr %d",
+ xmit_seoa.eoa_addr);
+
+ if (H5FDset_eoa(session->file, (H5FD_mem_t)xmit_seoa.type,
+ (haddr_t)xmit_seoa.eoa_addr)
+ < 0)
+ {
+ mirror_log(session->loginfo, V_ERR, "H5FDset_eoa()");
+ reply_error(session, "remote H5FDset_eoa() failure");
+ return -1;
+ }
+
+ if (reply_ok(session) < 0) {
+ mirror_log(session->loginfo, V_ERR, "can't reply");
+ reply_error(session, "ok reply failed; session contaminated");
+ return -1;
+ }
+
+ return 0;
+} /* end do_set_eoa() */
+
+
+/* ---------------------------------------------------------------------------
+ * Function: do_truncate
+ *
+ * Purpose: Handle a TRUNCATE operation.
+ *
+ * Return: 0 on success, -1 if error.
+ * ---------------------------------------------------------------------------
+ */
+static int
+do_truncate(struct mirror_session *session)
+{
+
+ HDassert(session && (session->magic == MW_SESSION_MAGIC));
+
+ mirror_log(session->loginfo, V_INFO, "do_truncate()");
+
+ /* default DXPL ID (0), 0 for "FALSE" closing -- both probably unused */
+ if (H5FDtruncate(session->file, 0, 0) < 0) {
+ mirror_log(session->loginfo, V_ERR, "H5FDtruncate()");
+ reply_error(session, "remote H5FDtruncate() failure");
+ return -1;
+ }
+
+ if (reply_ok(session) < 0) {
+ mirror_log(session->loginfo, V_ERR, "can't reply");
+ reply_error(session, "ok reply failed; session contaminated");
+ return -1;
+ }
+
+ return 0;
+} /* end do_truncate() */
+
+
+/* ---------------------------------------------------------------------------
+ * Function: do_unlock
+ *
+ * Purpose: Handle an UNLOCK operation.
+ *
+ * Return: 0 on success, -1 if error.
+ * ---------------------------------------------------------------------------
+ */
+static int
+do_unlock(struct mirror_session *session)
+{
+ HDassert(session && (session->magic == MW_SESSION_MAGIC));
+
+ mirror_log(session->loginfo, V_INFO, "do_unlock()");
+
+ if (H5FDunlock(session->file) < 0) {
+ mirror_log(session->loginfo, V_ERR, "H5FDunlock()");
+ reply_error(session, "remote H5FDunlock() failure");
+ return -1;
+ }
+
+ if (reply_ok(session) < 0) {
+ mirror_log(session->loginfo, V_ERR, "can't reply");
+ reply_error(session, "ok reply failed; session contaminated");
+ return -1;
+ }
+
+ return 0;
+} /* end do_unlock() */
+
+
+/* ---------------------------------------------------------------------------
+ * Function: do_write
+ *
+ * Purpose: Handle a WRITE operation.
+ * Receives command, replies; receives & writes data, replies.
+ *
+ * It is known that this results in suboptimal performance,
+ * but handling both small and very, very large write buffers
+ * with a single "over the wire" exchange
+ * poses design challenges not worth tackling as of March 2020.
+ *
+ * Return: 0 on success, -1 if error.
+ * ---------------------------------------------------------------------------
+ */
+static int
+do_write(struct mirror_session *session,
+ const unsigned char *xmit_buf)
+{
+ size_t decode_ret = 0;
+ haddr_t addr = 0;
+ haddr_t sum_bytes_written = 0;
+ H5FD_mem_t type = 0;
+ char *buf = NULL;
+ ssize_t nbytes_in_packet = 0;
+ H5FD_mirror_xmit_write_t xmit_write;
+
+ HDassert(session && (session->magic == MW_SESSION_MAGIC) && xmit_buf);
+
+ mirror_log(session->loginfo, V_INFO, "do_write()");
+
+ if (NULL == session->file) {
+ mirror_log(session->loginfo, V_ERR, "no open file!");
+ reply_error(session, "no file open on remote");
+ return -1;
+ }
+
+ decode_ret = H5FD_mirror_xmit_decode_write(&xmit_write, xmit_buf);
+ if (H5FD_MIRROR_XMIT_WRITE_SIZE != decode_ret) {
+ mirror_log(session->loginfo, V_ERR, "can't decode write xmit");
+ reply_error(session, "remote xmit_write_t decoding size failure");
+ return -1;
+ }
+
+ if (!H5FD_mirror_xmit_is_write(&xmit_write)) {
+ mirror_log(session->loginfo, V_ERR, "not a write xmit");
+ reply_error(session, "remote xmit_write_t decode failure");
+ return -1;
+ }
+
+ addr = (haddr_t)xmit_write.offset;
+ type = (H5FD_mem_t)xmit_write.type;
+
+ /* Allocate the buffer once -- re-use between loops.
+ */
+ buf = (char *)HDmalloc(sizeof(char) * H5FD_MIRROR_DATA_BUFFER_MAX);
+ if (NULL == buf) {
+ mirror_log(session->loginfo, V_ERR, "can't allocate databuffer");
+ reply_error(session, "can't allocate buffer for receiving data");
+ return -1;
+ }
+
+ /* got write signal; ready for data */
+ if (reply_ok(session) < 0) {
+ mirror_log(session->loginfo, V_ERR, "can't reply");
+ reply_error(session, "ok reply failed; session contaminated");
+ return -1;
+ }
+
+ mirror_log(session->loginfo, V_INFO, "to write %zu bytes at %zu",
+ xmit_write.size,
+ addr);
+
+ /* The given write may be:
+ * 1. larger than the allowed single buffer size
+ * 2. larger than the native size_t of this system
+ *
+ * Handle all cases by looping, ingesting as much of the stream as possible
+ * and writing that part to the file.
+ */
+ sum_bytes_written = 0;
+ do {
+ nbytes_in_packet = HDread(session->sockfd, buf,
+ H5FD_MIRROR_DATA_BUFFER_MAX);
+ if (-1 == nbytes_in_packet) {
+ mirror_log(session->loginfo, V_ERR, "can't read into databuffer");
+ reply_error(session, "can't read data buffer");
+ return -1;
+ }
+
+ mirror_log(session->loginfo, V_INFO, "received %zd bytes",
+ nbytes_in_packet);
+ if (HEXDUMP_WRITEDATA) {
+ mirror_log(session->loginfo, V_ALL, "DATA:\n```");
+ mirror_log_bytes(session->loginfo, V_ALL, nbytes_in_packet,
+ (const unsigned char *)buf);
+ mirror_log(session->loginfo, V_ALL, "```");
+ }
+
+ mirror_log(session->loginfo, V_INFO, "writing %zd bytes at %zu",
+ nbytes_in_packet,
+ (addr + sum_bytes_written));
+
+ if (H5FDwrite(session->file, type, H5P_DEFAULT,
+ (addr + sum_bytes_written), (size_t)nbytes_in_packet, buf)
+ < 0)
+ {
+ mirror_log(session->loginfo, V_ERR, "H5FDwrite()");
+ reply_error(session, "remote H5FDwrite() failure");
+ return -1;
+ }
+
+ sum_bytes_written += (haddr_t)nbytes_in_packet;
+
+ } while (sum_bytes_written < xmit_write.size); /* end while ingesting */
+
+ HDfree(buf);
+
+ /* signal that we're done here and a-ok */
+ if (reply_ok(session) < 0) {
+ mirror_log(session->loginfo, V_ERR, "can't reply");
+ reply_error(session, "ok reply failed; session contaminated");
+ return -1;
+ }
+
+ return 0;
+} /* end do_write() */
+
+
+/* ---------------------------------------------------------------------------
+ * Function: receive_communique
+ *
+ * Purpose: Accept bytes from the socket, check for emergency shutdown, and
+ * sanity-check received bytes.
+ * The raw bytes read are stored in the sock_comm structure at
+ * comm->raw.
+ * The raw bytes are decoded and a xmit_t (header) struct pointer
+ * in comm is populated at comm->xmit_recd.
+ *
+ * Return: 0 on success, -1 if error.
+ * ---------------------------------------------------------------------------
+ */
+static int
+receive_communique(
+ struct mirror_session *session,
+ struct sock_comm *comm)
+{
+ ssize_t read_ret = 0;
+ size_t decode_ret;
+ H5FD_mirror_xmit_t *X = comm->xmit_recd;
+
+ HDassert((session != NULL) && \
+ (session->magic == MW_SESSION_MAGIC) && \
+ (comm != NULL) && \
+ (comm->magic == MW_SOCK_COMM_MAGIC) && \
+ (comm->xmit_recd != NULL) && \
+ (comm->raw != NULL) && \
+ (comm->raw_size >= H5FD_MIRROR_XMIT_BUFFER_MAX) );
+
+ mirror_log(session->loginfo, V_INFO, "receive_communique()");
+
+ HDbzero(comm->raw, comm->raw_size);
+ comm->recd_die = 0;
+
+ mirror_log(session->loginfo, V_INFO, "ready to receive"); /* TODO */
+
+ read_ret = HDread(session->sockfd, comm->raw, H5FD_MIRROR_XMIT_BUFFER_MAX);
+ if (-1 == read_ret) {
+ mirror_log(session->loginfo, V_ERR, "read:%zd", read_ret);
+ goto error;
+ }
+
+ mirror_log(session->loginfo, V_INFO, "received %zd bytes", read_ret);
+ if (HEXDUMP_XMITS) {
+ mirror_log(session->loginfo, V_ALL, "```", read_ret);
+ mirror_log_bytes(session->loginfo, V_ALL, (size_t)read_ret,
+ (const unsigned char *)comm->raw);
+ mirror_log(session->loginfo, V_ALL, "```");
+ } /* end if hexdump transmissions received */
+
+ /* old-fashioned manual kill (for debugging) */
+ if (!HDstrncmp("GOODBYE", comm->raw, 7)) {
+ mirror_log(session->loginfo, V_INFO, "received GOODBYE");
+ comm->recd_die = 1;
+ goto done;
+ }
+
+ decode_ret = H5FD_mirror_xmit_decode_header(X,
+ (const unsigned char *)comm->raw);
+ if (H5FD_MIRROR_XMIT_HEADER_SIZE != decode_ret) {
+ mirror_log(session->loginfo, V_ERR,
+ "header decode size mismatch: expected (%z), got (%z)",
+ H5FD_MIRROR_XMIT_HEADER_SIZE, decode_ret);
+ /* Try to tell Driver that it should stop */
+ reply_error(session, "xmit size mismatch");
+ goto error;
+ }
+
+ if (!H5FD_mirror_xmit_is_xmit(X)) {
+ mirror_log(session->loginfo, V_ERR, "bad magic: 0x%X", X->magic);
+ /* Try to tell Driver that it should stop */
+ reply_error(session, "bad magic");
+ goto error;
+ }
+
+ if (session->xmit_count != X->xmit_count) {
+ mirror_log(session->loginfo, V_ERR,
+ "xmit_count mismatch exp:%d recd:%d",
+ session->xmit_count, X->xmit_count);
+ /* Try to tell Driver that it should stop */
+ reply_error(session, "xmit_count mismatch");
+ goto error;
+ }
+
+ if ( (session->token > 0) && (session->token != X->session_token) ) {
+ mirror_log(session->loginfo, V_ERR, "wrong session");
+ /* Try to tell Driver that it should stop */
+ reply_error(session, "wrong session");
+ goto error;
+ }
+
+ session->xmit_count++;
+
+done:
+ return 0;
+
+error:
+ return -1;
+} /* end receive_communique() */
+
+
+/* ---------------------------------------------------------------------------
+ * Function: process_instructions
+ *
+ * Purpose: Receive and handle all instructions from Driver.
+ *
+ * Return: 0 on success, -1 if error.
+ * ---------------------------------------------------------------------------
+ */
+static int
+process_instructions(struct mirror_session *session)
+{
+ struct sock_comm comm;
+ char xmit_buf[H5FD_MIRROR_XMIT_BUFFER_MAX]; /* raw bytes */
+ H5FD_mirror_xmit_t xmit_recd; /* for decoded xmit header */
+
+ HDassert(session && (session->magic == MW_SESSION_MAGIC));
+
+ mirror_log(session->loginfo, V_INFO, "process_instructions()");
+
+ comm.magic = MW_SOCK_COMM_MAGIC;
+ comm.recd_die = 0; /* Flag for program to terminate */
+ comm.xmit_recd = &xmit_recd;
+ comm.raw = xmit_buf;
+ comm.raw_size = sizeof(xmit_buf);
+
+ while (1) { /* sill-listening infinite loop */
+
+ /* Use convenience structure for raw/decoded info in/out */
+ if (receive_communique(session, &comm) < 0) {
+ mirror_log(session->loginfo, V_ERR, "problem reading socket");
+ return -1;
+ }
+
+ if (comm.recd_die) {
+ goto done;
+ }
+
+ switch(xmit_recd.op) {
+ case H5FD_MIRROR_OP_CLOSE:
+ if (do_close(session) < 0) {
+ return -1;
+ }
+ goto done;
+ case H5FD_MIRROR_OP_LOCK:
+ if (do_lock(session, (const unsigned char *)xmit_buf) < 0) {
+ return -1;
+ }
+ break;
+ case H5FD_MIRROR_OP_OPEN:
+ mirror_log(session->loginfo, V_ERR, "OPEN xmit during session");
+ reply_error(session, "illegal OPEN xmit during session");
+ return -1;
+ case H5FD_MIRROR_OP_SET_EOA:
+ if (do_set_eoa(session, (const unsigned char *)xmit_buf) < 0) {
+ return -1;
+ }
+ break;
+ case H5FD_MIRROR_OP_TRUNCATE:
+ if (do_truncate(session) < 0) {
+ return -1;
+ }
+ break;
+ case H5FD_MIRROR_OP_UNLOCK:
+ if (do_unlock(session) < 0) {
+ return -1;
+ }
+ break;
+ case H5FD_MIRROR_OP_WRITE:
+ if (do_write(session, (const unsigned char *)xmit_buf) < 0) {
+ return -1;
+ }
+ break;
+ default:
+ mirror_log(session->loginfo, V_ERR, "unrecognized transmission");
+ reply_error(session, "unrecognized transmission");
+ return -1;
+ } /* end switch (xmit_recd.op) */
+
+ } /* end while still listening */
+
+done:
+ comm.magic = 0; /* invalidate structure, on principle */
+ xmit_recd.magic = 0; /* invalidate structure, on principle */
+ return 0;
+} /* end process_instructions() */
+
+
+/* ------------------------------------------------------------------------- */
+herr_t
+run_writer(int socketfd, H5FD_mirror_xmit_open_t *xmit_open)
+{
+ struct mirror_session *session = NULL;
+ int ret_value = SUCCEED;
+
+ session = session_start(socketfd, xmit_open);
+ if (NULL == session) {
+ mirror_log(NULL, V_ERR, "Can't start session -- aborting");
+ ret_value = FAIL;
+ }
+ else {
+ if (process_instructions(session) < 0) {
+ mirror_log(session->loginfo, V_ERR,
+ "problem processing instructions");
+ ret_value = FAIL;
+ }
+ if (session_stop(session) < 0) {
+ mirror_log(NULL, V_ERR, "Can't stop session -- going down hard");
+ ret_value = FAIL;
+ }
+ }
+
+ return ret_value;
+} /* end run_writer */
+
+#endif /* H5_HAVE_MIRROR_VFD */
+