diff options
author | Jacob Smith <jake.smith@hdfgroup.org> | 2020-03-13 22:13:17 (GMT) |
---|---|---|
committer | David Young <dyoung@hdfgroup.org> | 2020-05-20 14:34:20 (GMT) |
commit | b11015c4ccec9384e516ca13a0a56b209b71a578 (patch) | |
tree | d140c28354788589a09ae8d6cdec116bdc5b9646 /utils | |
parent | 333388c744626b23791298be9f9165fa5bc9d037 (diff) | |
download | hdf5-b11015c4ccec9384e516ca13a0a56b209b71a578.zip hdf5-b11015c4ccec9384e516ca13a0a56b209b71a578.tar.gz hdf5-b11015c4ccec9384e516ca13a0a56b209b71a578.tar.bz2 |
Add Splitter VFD to library.
* "Simultaneous and equivalent" Read-Write and Write-Only channels for
file I/O.
* Only supports drivers with the H5FD_FEAT_DEFAULT_VFD_COMPATIBLE flag for
now, preventing issues with multi-file drivers.
Add Mirror VFD to library.
* Write-only operations over a network.
* Uses TCP/IP sockets.
* Server and auxiliary server-shutdown programs provided in a new directory,
`utils/mirror_vfd`.
* Automated testing via loopback ("remote" of localhost).
Diffstat (limited to 'utils')
-rw-r--r-- | utils/CMakeLists.txt | 4 | ||||
-rw-r--r-- | utils/COPYING | 12 | ||||
-rw-r--r-- | utils/Makefile.am | 26 | ||||
-rw-r--r-- | utils/mirror_vfd/CMakeLists.txt | 64 | ||||
-rw-r--r-- | utils/mirror_vfd/Makefile.am | 30 | ||||
-rw-r--r-- | utils/mirror_vfd/mirror_remote.c | 225 | ||||
-rw-r--r-- | utils/mirror_vfd/mirror_remote.h | 51 | ||||
-rw-r--r-- | utils/mirror_vfd/mirror_server.c | 645 | ||||
-rw-r--r-- | utils/mirror_vfd/mirror_server_halten_sie.c | 214 | ||||
-rw-r--r-- | utils/mirror_vfd/mirror_writer.c | 1064 |
10 files changed, 2335 insertions, 0 deletions
diff --git a/utils/CMakeLists.txt b/utils/CMakeLists.txt new file mode 100644 index 0000000..2d5626e --- /dev/null +++ b/utils/CMakeLists.txt @@ -0,0 +1,4 @@ +cmake_minimum_required (VERSION 3.10) +project (HDF5_UTILS C) + +add_subdirectory (mirror_vfd) diff --git a/utils/COPYING b/utils/COPYING new file mode 100644 index 0000000..4a23112 --- /dev/null +++ b/utils/COPYING @@ -0,0 +1,12 @@ + + Copyright by The HDF Group. + All rights reserved. + + The files and subdirectories in this directory are part of HDF5. + The full HDF5 copyright notice, including terms governing use, + modification, and redistribution, is contained in the COPYING file + which can be found at the root of the source code distribution tree + or in https://support.hdfgroup.org/ftp/HDF5/releases. If you do + not have access to either file, you may request a copy from + help@hdfgroup.org. + diff --git a/utils/Makefile.am b/utils/Makefile.am new file mode 100644 index 0000000..b4409ac --- /dev/null +++ b/utils/Makefile.am @@ -0,0 +1,26 @@ +# +# Copyright by The HDF Group. +# All rights reserved. +# +# This file is part of HDF5. The full HDF5 copyright notice, including +# terms governing use, modification, and redistribution, is contained in +# the COPYING file, which can be found at the root of the source code +# distribution tree, or in https://support.hdfgroup.org/ftp/HDF5/releases. +# If you do not have access to either file, you may request a copy from +# help@hdfgroup.org. +## +## Makefile.am +## Run automake to generate a Makefile.in from this file. +## +# +# Utils HDF5 Makefile(.in) +# + +include $(top_srcdir)/config/commence.am + +CONFIG=ordered + +# All subdirectories +SUBDIRS=mirror_vfd + +include $(top_srcdir)/config/conclude.am diff --git a/utils/mirror_vfd/CMakeLists.txt b/utils/mirror_vfd/CMakeLists.txt new file mode 100644 index 0000000..405c420 --- /dev/null +++ b/utils/mirror_vfd/CMakeLists.txt @@ -0,0 +1,64 @@ +cmake_minimum_required (VERSION 3.10) +project (HDF5_UTILS_MIRRORVFD C) + +#----------------------------------------------------------------------------- +# Add the mirror_server executable +#----------------------------------------------------------------------------- + +set (mirror_server_SOURCES + ${HDF5_UTILS_MIRRORVFD_SOURCE_DIR}/mirror_remote.c + ${HDF5_UTILS_MIRRORVFD_SOURCE_DIR}/mirror_server.c + ${HDF5_UTILS_MIRRORVFD_SOURCE_DIR}/mirror_writer.c) +add_executable (mirror_server ${mirror_server_SOURCES}) +target_include_directories (mirror_server PRIVATE "${HDF5_UITLS_DIR};${HDF5_SRC_DIR};${HDF5_BINARY_DIR};$<$<BOOL:${HDF5_ENABLE_PARALLEL}>:${MPI_C_INCLUDE_DIRS}>") +if (NOT BUILD_SHARED_LIBS) + TARGET_C_PROPERTIES (mirror_server STATIC) + target_link_libraries (mirror_server PRIVATE ${HDF5_TOOLS_LIB_TARGET} ${HDF5_LIB_TARGET}) +else () + TARGET_C_PROPERTIES (mirror_server SHARED) + target_link_libraries (mirror_server PRIVATE ${HDF5_TOOLS_LIBSH_TARGET} ${HDF5_LIBSH_TARGET}) +endif () +set_target_properties (mirror_server PROPERTIES FOLDER utils) +set_global_variable (HDF5_UTILS_TO_EXPORT "${HDF5_UTILS_TO_EXPORT};mirror_server") +set (H5_DEP_EXECUTABLES ${H5_DEP_EXECUTABLES} mirror_server) + +#----------------------------------------------------------------------------- +# Add the mirror_server_halten_sie executable +#----------------------------------------------------------------------------- + +set (mirror_server_halt_SOURCES ${HDF5_UTILS_MIRRORVFD_SOURCE_DIR}/mirror_server_halten_sie.c) +add_executable (mirror_server_halt ${mirror_server_halt_SOURCES}) +target_include_directories (mirror_server_halt PRIVATE "${HDF5_UITLS_DIR};${HDF5_SRC_DIR};${HDF5_BINARY_DIR};$<$<BOOL:${HDF5_ENABLE_PARALLEL}>:${MPI_C_INCLUDE_DIRS}>") +if (NOT BUILD_SHARED_LIBS) + TARGET_C_PROPERTIES (mirror_server_halt STATIC) + target_link_libraries (mirror_server_halt PRIVATE ${HDF5_TOOLS_LIB_TARGET} ${HDF5_LIB_TARGET}) +else () + TARGET_C_PROPERTIES (mirror_server_halt SHARED) + target_link_libraries (mirror_server_halt PRIVATE ${HDF5_TOOLS_LIBSH_TARGET} ${HDF5_LIBSH_TARGET}) +endif () +set_target_properties (mirror_server_halt PROPERTIES FOLDER utils) +set_global_variable (HDF5_UTILS_TO_EXPORT "${HDF5_UTILS_TO_EXPORT};mirror_server_halt") +set (H5_DEP_EXECUTABLES ${H5_DEP_EXECUTABLES} mirror_server_halt) + +############################################################################## +############################################################################## +### I N S T A L L A T I O N ### +############################################################################## +############################################################################## + +#----------------------------------------------------------------------------- +# Rules for Installation of tools using make Install target +#----------------------------------------------------------------------------- +if (HDF5_EXPORTED_TARGETS) + foreach (exec ${H5_DEP_EXECUTABLES}) + INSTALL_PROGRAM_PDB (${exec} ${HDF5_INSTALL_BIN_DIR} utilsapplications) + endforeach () + + install ( + TARGETS + ${H5_DEP_EXECUTABLES} + EXPORT + ${HDF5_EXPORTED_TARGETS} + RUNTIME DESTINATION ${HDF5_INSTALL_BIN_DIR} COMPONENT utilsapplications + ) +endif () diff --git a/utils/mirror_vfd/Makefile.am b/utils/mirror_vfd/Makefile.am new file mode 100644 index 0000000..448e4cd --- /dev/null +++ b/utils/mirror_vfd/Makefile.am @@ -0,0 +1,30 @@ +# +# Copyright by The HDF Group. +# All rights reserved. +# +# This file is part of HDF5. The full HDF5 copyright notice, including +# terms governing use, modification, and redistribution, is contained in +# the COPYING file, which can be found at the root of the source code +# distribution tree, or in https://support.hdfgroup.org/ftp/HDF5/releases. +# If you do not have access to either file, you may request a copy from +# help@hdfgroup.org. +## +## Makefile.am +## Run automake to generate a Makefile.in from this file. +# +# HDF5 Library Makefile(.in) +# + +include $(top_srcdir)/config/commence.am + +AM_CPPFLAGS+=-I$(top_srcdir)/src + +bin_PROGRAMS = mirror_server mirror_server_halten_sie + +mirror_server_SOURCES = mirror_server.c mirror_writer.c mirror_remote.c +#mirror_writer_SOURCES = mirror_writer.c mirror_remote.c + +# All programs depend on the hdf5 library +LDADD=$(LIBHDF5) + +include $(top_srcdir)/config/conclude.am diff --git a/utils/mirror_vfd/mirror_remote.c b/utils/mirror_vfd/mirror_remote.c new file mode 100644 index 0000000..81a3625 --- /dev/null +++ b/utils/mirror_vfd/mirror_remote.c @@ -0,0 +1,225 @@ +/* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * + * Copyright by The HDF Group. * + * All rights reserved. * + * * + * This file is part of HDF5. The full HDF5 copyright notice, including * + * terms governing use, modification, and redistribution, is contained in * + * the COPYING file, which can be found at the root of the source code * + * distribution tree, or in https://support.hdfgroup.org/ftp/HDF5/releases. * + * If you do not have access to either file, you may request a copy from * + * help@hdfgroup.org. * + * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */ + +/* Common operations for "remote" processes for the Mirror VFD. + * + * Jacob Smith, 2020-03-06 + */ + +#include "mirror_remote.h" + +#ifdef H5_HAVE_MIRROR_VFD + + +/* --------------------------------------------------------------------------- + * Function: mirror_log + * + * Purpose: Write message to the logging stream/file. + * If logging info pointer is NULL, uses logging defaults. + * ---------------------------------------------------------------------------- + */ +void +mirror_log(struct mirror_log_info *info, + unsigned int level, + const char *format, + ...) +{ + FILE *stream = MIRROR_LOG_DEFAULT_STREAM; + unsigned int verbosity = MIRROR_LOG_DEFAULT_VERBOSITY; + hbool_t custom = FALSE; + + if (info != NULL && info->magic == MIRROR_LOG_INFO_MAGIC) { + stream = info->stream; + verbosity = info->verbosity; + custom = TRUE; + } + + if (level == V_NONE) { + return; + } + else + if (level <= verbosity) { + if (custom == TRUE && info->prefix[0] != '\0') { + HDfprintf(stream, "%s", info->prefix); + } + + switch (level) { + case (V_ERR) : + HDfprintf(stream, "ERROR "); + break; + case (V_WARN) : + HDfprintf(stream, "WARNING "); + break; + default: + break; + } + + if (format != NULL) { + va_list args; + HDva_start(args, format); + HDvfprintf(stream, format, args); + HDva_end(args); + } + + HDfprintf(stream, "\n"); + HDfflush(stream); + } /* end if sufficiently verbose to print */ +} /* end mirror_log() */ + + +/* --------------------------------------------------------------------------- + * Function: session_log_bytes + * + * Purpose: "Pretty-print" raw binary data to logging stream/file. + * If info pointer is NULL, uses logging defaults. + * ---------------------------------------------------------------------------- + */ +void +mirror_log_bytes(struct mirror_log_info *info, + unsigned int level, + size_t n_bytes, + const unsigned char *buf) +{ + FILE *stream = MIRROR_LOG_DEFAULT_STREAM; + unsigned int verbosity = MIRROR_LOG_DEFAULT_VERBOSITY; + + if (buf == NULL) { + return; + } + + if (info != NULL && info->magic == MIRROR_LOG_INFO_MAGIC) { + stream = info->stream; + verbosity = info->verbosity; + } + + if (level <= verbosity) { + size_t bytes_written = 0; + const unsigned char *b = NULL; + + /* print whole lines */ + while ((n_bytes - bytes_written) >= 32) { + b = buf + bytes_written; /* point to region in buffer */ + HDfprintf(stream, + "%04zX %02X%02X%02X%02X %02X%02X%02X%02X" \ + " %02X%02X%02X%02X %02X%02X%02X%02X" \ + " %02X%02X%02X%02X %02X%02X%02X%02X" \ + " %02X%02X%02X%02X %02X%02X%02X%02X\n", + bytes_written, + b[0], b[1], b[2], b[3], b[4], b[5], b[6], b[7], + b[8], b[9], b[10],b[11], b[12],b[13],b[14],b[15], + b[16],b[17],b[18],b[19], b[20],b[21],b[22],b[23], + b[24],b[25],b[26],b[27], b[28],b[29],b[30],b[31]); + bytes_written += 32; + } + + /* start partial line */ + if (n_bytes > bytes_written) { + HDfprintf(stream, "%04zX ", bytes_written); + } + + /* partial line blocks */ + while ((n_bytes - bytes_written) >= 4) { + HDfprintf(stream, " %02X%02X%02X%02X", + buf[bytes_written], buf[bytes_written+1], + buf[bytes_written+2], buf[bytes_written+3]); + bytes_written += 4; + } + + /* block separator before partial block */ + if (n_bytes > bytes_written) { + HDfprintf(stream, " "); + } + + /* partial block individual bytes */ + while (n_bytes > bytes_written) { + HDfprintf(stream, "%02X", buf[bytes_written++]); + } + + /* end partial line */ + HDfprintf(stream, "\n"); + } /* end if suitably verbose to log */ +} /* end mirror_log_bytes() */ + + +/* --------------------------------------------------------------------------- + * Function: mirror_log_init + * + * Purpose: Prepare a loginfo_t structure for use. + * + * Return: Success: Pointer to newly-ceated info. + * Failure: NULL. Either unable to allocate or cannot open file. + * ---------------------------------------------------------------------------- + */ +loginfo_t * +mirror_log_init(char *path, char *prefix, unsigned int verbosity) +{ + loginfo_t *info = NULL; + + info = (loginfo_t *)HDmalloc(sizeof(loginfo_t)); + if (info != NULL) { + info->magic = MIRROR_LOG_INFO_MAGIC; + info->verbosity = verbosity; + info->stream = MIRROR_LOG_DEFAULT_STREAM; + info->prefix[0] = '\0'; + + if (prefix && *prefix) { + HDstrncpy(info->prefix, prefix, MIRROR_LOG_PREFIX_MAX); + } + + if (path && *path) { + FILE *f = NULL; + f = HDfopen(path, "w"); + if (NULL == f) { + HDfprintf(MIRROR_LOG_DEFAULT_STREAM, + "WARN custom logging path could not be opened: %s\n", + path); + info->magic += 1; + HDfree(info); + } + else { + info->stream = f; + } + } + + } /* end if able to allocate */ + + return info; +} /* end mirror_log_init() */ + + +/* --------------------------------------------------------------------------- + * Function: mirror_log_term + * + * Purpose: Shut down and clean up a loginfo_t structure. + * + * Return: Success: SUCCEED. Resources released. + * Failure: FAIL. Indeterminite state. + * ---------------------------------------------------------------------------- + */ +herr_t +mirror_log_term(loginfo_t *info) +{ + if (info == NULL || info->magic != MIRROR_LOG_INFO_MAGIC) { + return FAIL; + } + if (info->stream != stderr || info->stream != stdout) { + if (HDfclose(info->stream) < 0) { + return FAIL; + } + } + info->magic += 1; + HDfree(info); + return SUCCEED; +} /* end mirror_log_term() */ + +#endif /* H5_HAVE_MIRROR_VFD */ + diff --git a/utils/mirror_vfd/mirror_remote.h b/utils/mirror_vfd/mirror_remote.h new file mode 100644 index 0000000..9132d51 --- /dev/null +++ b/utils/mirror_vfd/mirror_remote.h @@ -0,0 +1,51 @@ +/* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * + * Copyright by The HDF Group. * + * All rights reserved. * + * * + * This file is part of HDF5. The full HDF5 copyright notice, including * + * terms governing use, modification, and redistribution, is contained in * + * the COPYING file, which can be found at the root of the source code * + * distribution tree, or in https://support.hdfgroup.org/ftp/HDF5/releases. * + * If you do not have access to either file, you may request a copy from * + * help@hdfgroup.org. * + * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */ + +/* Common definitions for "remote" processes for the Mirror VFD. + * + * Jacob Smith, 2020-03-06 + */ + +#include "hdf5.h" +#include "H5private.h" + +#ifdef H5_HAVE_MIRROR_VFD + +#define V_NONE 0 +#define V_ERR 1 +#define V_WARN 2 +#define V_INFO 3 +#define V_ALL 4 + +#define MIRROR_LOG_DEFAULT_STREAM stdout +#define MIRROR_LOG_DEFAULT_VERBOSITY V_WARN +#define MIRROR_LOG_PREFIX_MAX 79 +#define MIRROR_LOG_INFO_MAGIC 0x569D589A + +typedef struct mirror_log_info { + uint32_t magic; + FILE *stream; + unsigned int verbosity; + char prefix[MIRROR_LOG_PREFIX_MAX+1]; +} loginfo_t; + +void mirror_log(loginfo_t *info, unsigned int level, + const char *format, ...); +void mirror_log_bytes(loginfo_t *info, unsigned int level, + size_t n_bytes, const unsigned char *buf); +loginfo_t *mirror_log_init(char *path, char *prefix, unsigned int verbosity); +int mirror_log_term(loginfo_t *loginfo); + +herr_t run_writer(int socketfd, H5FD_mirror_xmit_open_t *xmit_open); + +#endif /* H5_HAVE_MIRROR_VFD */ + diff --git a/utils/mirror_vfd/mirror_server.c b/utils/mirror_vfd/mirror_server.c new file mode 100644 index 0000000..314f067 --- /dev/null +++ b/utils/mirror_vfd/mirror_server.c @@ -0,0 +1,645 @@ +/* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * + * Copyright by The HDF Group. * + * All rights reserved. * + * * + * This file is part of HDF5. The full HDF5 copyright notice, including * + * terms governing use, modification, and redistribution, is contained in * + * the COPYING file, which can be found at the root of the source code * + * distribution tree, or in https://support.hdfgroup.org/ftp/HDF5/releases. * + * If you do not have access to either file, you may request a copy from * + * help@hdfgroup.org. * + * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */ + +/* + * "Server" application to associate a Mirror VFD Driver with a Writer. + * + * Server waits on a dedicated port for a Driver to attempt to connect. + * When connection is made, reads a message from the Driver. + * If message is "SHUTDOWN", Server closes connection and terminates. + * Else, if it receives an encoded OPEN xmit (from Driver), the Server forks + * itself; the child becomes a dedicated Writer and maintains connection with + * the Driver instance, and the parent process remains a Server and returns + * to listening for incoming requests. + * Else, the message is not recognized and is ignored. + * + * + * + * mirror_server [args] + * + * Primary server for coordinating mirror VFD connections with the remote + * process. + * + * args: + * --help, -h Print help message and exit. + * --port=N Positive integer for primary listening port. + * --verbosity=N Debugging verbosity + * 0: none + * 1: errors + * 2: details + * 3: all + * --logpath=S File path to direct debugging output, if any. + * Default of none prints output to stdout. + * + */ + +#include "mirror_remote.h" + +#ifdef H5_HAVE_MIRROR_VFD + +#define MAXBUF 2048 /* max buffer length. */ +#define LISTENQ 80 /* max pending mirrorS requests */ +#define DEFAULT_PORT 3000 /* default listening port */ +#define MAX_PORT_LOOPS 20 /* max iteratations through port range */ +#define PORT_LOOP_RETRY_DELAY 1 /* seconds to wait between port scans */ + +/* semi-unique "magic" numbers to sanity-check structure pointers */ +#define OP_ARGS_MAGIC 0xCF074379u +#define SERVER_RUN_MAGIC 0x741B459Au + + +/* --------------------------------------------------------------------------- + * Structure: struct op_args + * + * Purpose: Convenience structure for holding arguments from command-line. + * + * `magic` (uint32_t) + * Semi-unique number to help validate a pointer to this struct type. + * Must be OP_ARGS_MAGIC to be considered valid. + * + * `help` (int) + * Flag that the help argument was present in the command line. + * + * `main_port` (int) + * Flag that the help argument was present in the command line. + * + * `verbosity` (int) + * Number between 0 (none) and 4 (all) that controls how much detail + * the program prints as a course of logging. + * + * `log_prepend_serv` (int) + * Flag that the logging messages should have 'S- ' at the start of each + * line. + * + * `log_prepend_type` (int) + * Flag that the logging messages should have the assocaited verbosity + * level present in the line (e.g., "WARN", "ERROR", or "INFO"). + * + * `log_path` (char *) + * Path string from the command line, giving the absolute path + * for the file for logging output. Can be empty. + * + * `writer_log_path` (char *) + * Path string from the command line, giving the absolute path + * for the file for writer's logging output. Can be empty. + * + * --------------------------------------------------------------------------- + */ +struct op_args { + uint32_t magic; + int help; + int main_port; + int verbosity; + int log_prepend_serv; + int log_prepend_type; + char log_path[PATH_MAX+1]; + char writer_log_path[PATH_MAX+1]; +}; + + +/* --------------------------------------------------------------------------- + * Structure: struct server_run + * + * Purpose: Convenience structure for holding information about a server + * in operation. + * + * `magic` (uint32_t) + * Semi-unique number to help validate a pointer to this struct type. + * Must be SERVER_RUN_MAGIC to be considered valid. + * + * `log_stream` (FILE *) + * File handle where logging output is directed. + * By default, is stdout. + * + * `opts` (struct opt_args) + * Contained structure, holds the server's configuration. + * + * `listenfd` (int) + * File descriptor of the listening socket. + * + * --------------------------------------------------------------------------- + */ +struct server_run { + uint32_t magic; + struct op_args opts; + struct mirror_log_info *loginfo; + int listenfd; +}; + + +/* --------------------------------------------------------------------------- + * Function: usage + * + * Purpose: Print the usage message to stdout. + * --------------------------------------------------------------------------- + */ +static void +usage(void) +{ + HDfprintf(stdout, + "mirror_server [options]\n" \ + "\n" \ + "Application for providing Mirror Writer process to " \ + " Mirror VFD on file-open.\n" \ + "Listens on a dedicated socket; forks as a Writer upon receipt" \ + " of a valid OPEN xmit.\n" \ + "\n" \ + "Options:\n" \ + "--help [-h] : Print this help message and quit.\n" \ + "--logpath=PATH : File path for logging output " \ + "(default none, to stdout).\n" \ + "--port=PORT : Primary port (default %d).\n" \ + "--verbosity=NUM : Debug printing level " \ + "0..4, (default %d).\n", + DEFAULT_PORT, + MIRROR_LOG_DEFAULT_VERBOSITY); +} /* end usage() */ + + +/* --------------------------------------------------------------------------- + * Function: parse_args + * + * Purpose: Read command line options and store results in args_out + * structure. Fails in event of unrecognized option. + * + * Return: 0 on success, -1 on failure. + * --------------------------------------------------------------------------- + */ +static int +parse_args(int argc, char **argv, struct op_args *args_out) +{ + int i; + + /* preset default values + */ + args_out->main_port = DEFAULT_PORT; + args_out->help = 0; + args_out->log_prepend_serv = 1; + args_out->log_prepend_type = 1; + args_out->verbosity = MIRROR_LOG_DEFAULT_VERBOSITY; + /* preset empty strings */ + HDbzero(args_out->log_path, PATH_MAX+1); + HDbzero(args_out->writer_log_path, PATH_MAX+1); + + if (argv == NULL || *argv == NULL) { + mirror_log(NULL, V_ERR, "invalid argv pointer"); + return -1; + } + + /* Loop over arguments after program name and writer_path */ + for (i=2; i < argc; i++) { + if (!HDstrncmp(argv[i], "-h", 3) || !HDstrncmp(argv[i], "--help", 7)) { + mirror_log(NULL, V_INFO, "found help argument"); + args_out->help = 1; + return 0; + } /* end if help */ + else + if (!HDstrncmp(argv[i], "--port=", 7)) { + mirror_log(NULL, V_INFO, "parsing 'main_port' (%s)", argv[i]+7); + args_out->main_port = HDatoi(argv[i]+7); + } /* end if port */ + else + if (!HDstrncmp(argv[i], "--verbosity=", 12)) { + mirror_log(NULL, V_INFO, "parsing 'verbosity' (%s)", argv[i]+12); + args_out->verbosity = HDatoi(argv[i]+12); + } /* end if verbosity */ + else + if (!HDstrncmp(argv[i], "--logpath=", 10)) { + mirror_log(NULL, V_INFO, "parsing 'logpath' (%s)", argv[i]+10); + HDstrncpy(args_out->log_path, argv[i]+10, PATH_MAX); + } /* end if logpath */ + else { + mirror_log(NULL, V_ERR, "unrecognized argument: %s", argv[i]); + return -1; + } /* end if unrecognized argument */ + } /* end for each arg after the path to writer "receiver process" */ + + mirror_log(NULL, V_INFO, "all args parsed"); + + return 0; +} /* end parse_args() */ + + +/* --------------------------------------------------------------------------- + * Function: prepare_listening_socket + * + * Purpose: Configure and open a socket. + * In event of error, attempts to undo its processes. + * + * Return: Success: non-negative (the file descriptor of the socket) + * Failure: -1 + * --------------------------------------------------------------------------- + */ +static int +prepare_listening_socket(struct server_run *run) +{ + struct sockaddr_in server_addr; + int _true = 1; /* needed for setsockopt() */ + int ret_value = -1; + int ret = 0; /* for checking return value of function calls */ + + if (run == NULL || run->magic != SERVER_RUN_MAGIC) { + mirror_log(NULL, V_ERR, "invalid server_run pointer"); + return -1; + } + + mirror_log(run->loginfo, V_INFO, "preparing socket"); + + server_addr.sin_family = AF_INET; + server_addr.sin_addr.s_addr = HDhtonl(INADDR_ANY); + server_addr.sin_port = HDhtons((uint16_t)run->opts.main_port); + + mirror_log(run->loginfo, V_INFO, "socket()"); + ret_value = HDsocket(AF_INET, SOCK_STREAM, 0); + if (ret_value < 0) { + mirror_log(run->loginfo, V_ERR, "listening socket:%d", ret_value); + goto error; + } + + mirror_log(run->loginfo, V_ALL, "setsockopt()"); + HDsetsockopt(ret_value, SOL_SOCKET, SO_REUSEADDR, &_true, sizeof(int)); + + mirror_log(run->loginfo, V_INFO, "bind()"); + ret = HDbind(ret_value, (struct sockaddr *)&server_addr, + sizeof(server_addr)); + if (ret < 0) { + mirror_log(run->loginfo, V_ERR, "bind() %s", HDstrerror(errno)); + goto error; + } + + mirror_log(run->loginfo, V_INFO, "listen()"); + ret = HDlisten(ret_value, LISTENQ); + if (ret < 0) { + mirror_log(run->loginfo, V_ERR, "H5FD server listen:%d", ret); + goto error; + } + + return ret_value; + +error: + if (ret_value >= 0) { + HDshutdown(ret_value, SHUT_RDWR); + HDclose(ret_value); + } + return -1; +} /* end prepare_listening_socket() */ + + +/* --------------------------------------------------------------------------- + * Function: init_server_run + * + * Purpose: Set up server_run struct with default and specified values. + * + * Return: Zero (0) if successful, -1 if an error occurred. + * --------------------------------------------------------------------------- + */ +static struct server_run * +init_server_run(int argc, char **argv) +{ + struct server_run *run; + + run = (struct server_run *)HDmalloc(sizeof(struct server_run)); + if (run == NULL) { + mirror_log(NULL, V_ERR, "can't allocate server_run struct"); + return NULL; + } + + run->magic = (uint32_t)SERVER_RUN_MAGIC; + run->opts.magic = (uint32_t)OP_ARGS_MAGIC; + run->listenfd = -1; + + if (parse_args(argc, argv, &(run->opts)) < 0) { + mirror_log(NULL, V_ERR, "can't parse arguments"); + usage(); + goto error; + } + + if (run->opts.help) { + usage(); + return run; /* early exit */ + } + + run->loginfo = mirror_log_init(run->opts.log_path, "s- ", + run->opts.verbosity); + + run->listenfd = prepare_listening_socket(run); + if (run->listenfd < 0) { + mirror_log(NULL, V_ERR, "can't prepare listening socket"); + goto error; + } + + return run; + +error: + if (run != NULL) { + HDfree(run); + } + return NULL; + +} /* end init_server_run() */ + + +/* --------------------------------------------------------------------------- + * Function: term_server_run + * + * Purpose: Close opened items in a sever_run and release the pointer. + * + * Return: Zero (0) if successful, -1 if an error occurred. + * --------------------------------------------------------------------------- + */ +static int +term_server_run(struct server_run *run) +{ + if (run == NULL || run->magic != SERVER_RUN_MAGIC) { + mirror_log(NULL, V_ERR, "invalid server_run pointer"); + return -1; + } + + mirror_log(run->loginfo, V_INFO, "shutting down"); + + if (run->listenfd >= 0) { + HDshutdown(run->listenfd, SHUT_RDWR); /* TODO: error-checking? */ + HDclose(run->listenfd); /* TODO: error-checking? */ + run->listenfd = -1; + } + + if (mirror_log_term(run->loginfo) < 0) { + mirror_log(NULL, V_ERR, "can't close logging stream"); + return -1; /* doesn't solve the problem, but informs of error */ + } + run->loginfo = NULL; + + (run->magic)++; + (run->opts.magic)++; + HDfree(run); + return 0; +} /* end term_server_run() */ + + +/* --------------------------------------------------------------------------- + * Function: accept_connection + * + * Purpose: Main working loop; process requests as they are received. + * Does nothing if the run option help is set. + * + * Return: -1 on error, else a non-negative file descriptor of the socket. + * --------------------------------------------------------------------------- + */ +static int +accept_connection(struct server_run *run) +{ + struct sockaddr_in client_addr; /**/ + socklen_t clilen; /**/ + struct hostent *host_port = NULL; /**/ + char *hostaddrp; /**/ + int connfd = -1; /* connection file descriptor */ + + if (run == NULL || run->magic != SERVER_RUN_MAGIC) { + mirror_log(NULL, V_ERR, "invalid server_run pointer"); + return -1; + } + + /*------------------------------*/ + /* accept a connection on a socket */ + clilen = sizeof(client_addr); + connfd = HDaccept(run->listenfd, (struct sockaddr *)&client_addr, &clilen); + if (connfd < 0) { + mirror_log(run->loginfo, V_ERR, "accept:%d", connfd); + goto error; + } + mirror_log(run->loginfo, V_INFO, "connection achieved"); + + /*------------------------------*/ + /* get client address information */ + host_port = HDgethostbyaddr( + (const char *)&client_addr.sin_addr.s_addr, + sizeof(client_addr.sin_addr.s_addr), + AF_INET); + if (host_port == NULL) { + mirror_log(run->loginfo, V_ERR, "gethostbyaddr()"); + goto error; + } + + /* function has the string space statically scoped -- OK until next call */ + hostaddrp = HDinet_ntoa(client_addr.sin_addr); + /* TODO? proper error-checking */ + + mirror_log(run->loginfo, V_INFO, + "server connected with %s (%s)", + host_port->h_name, + hostaddrp); + + return connfd; + +error: + if (connfd >= 0) { + close(connfd); + } + return -1; +} /* end accept_connection() */ + + +/* --------------------------------------------------------------------------- + * Function: wait_for_child + * + * Purpose: Signal handler to reap zombie processes. + * --------------------------------------------------------------------------- + */ +static void +wait_for_child(int sig) +{ + while (HDwaitpid(-1, NULL, WNOHANG) > 0); +} /* end wait_for_child() */ + + +/* --------------------------------------------------------------------------- + * Function: handle_requests + * + * Purpose: Main working loop; process requests as they are received. + * Does nothing if the run option `help` is set. + * + * Return: -1 on error, else 0 for successful operation. + * --------------------------------------------------------------------------- + */ +static int +handle_requests(struct server_run *run) +{ + int connfd = -1; /**/ + char mybuf[H5FD_MIRROR_XMIT_OPEN_SIZE]; /**/ + int ret; /* general-purpose error-checking */ + int pid; /* process ID of fork */ + struct sigaction sa; + int ret_value = 0; + + if (run == NULL || run->magic != SERVER_RUN_MAGIC) { + mirror_log(NULL, V_ERR, "invalid server_run pointer"); + return -1; + } + + if (run->opts.help) { + return 0; + } + + if (run->listenfd < 0) { + mirror_log(NULL, V_ERR, "invalid listening socket"); + return -1; + } + + /* Set up the signal handler */ + sa.sa_handler = wait_for_child; + HDsigemptyset(&sa.sa_mask); + sa.sa_flags = SA_RESTART; + if (HDsigaction(SIGCHLD, &sa, NULL) == -1) { + perror("sigaction"); + return 1; + } + + /* Keep listening for attempts to connect. + */ + + while (1) { /* infinite loop, exited via break or goto */ + mirror_log(run->loginfo, V_INFO, "server waiting for connections..."); + + connfd = -1; + + connfd = accept_connection(run); + if (connfd < 0) { + mirror_log(run->loginfo, V_ERR, "unable to receive connection"); + goto error; + } + + /* Read handshake from port connection. + */ + + ret = (int)HDread(connfd, &mybuf, H5FD_MIRROR_XMIT_OPEN_SIZE); + if (-1 == ret) { + mirror_log(run->loginfo, V_ERR, "read:%d", ret); + goto error; + } + mirror_log(run->loginfo, V_INFO, "received %d bytes", ret); + mirror_log(run->loginfo, V_ALL, "```"); + mirror_log_bytes(run->loginfo, V_ALL, ret, + (const unsigned char *)mybuf); + mirror_log(run->loginfo, V_ALL, "```"); + + /* Respond to handshake message. + */ + + if (!HDstrncmp("SHUTDOWN", mybuf, 8)) { + /* Stop operation if told to stop */ + mirror_log(run->loginfo, V_INFO, "received SHUTDOWN!", ret); + HDclose(connfd); + connfd = -1; + goto done; + } /* end if explicit "SHUTDOWN" directive */ + else + if (H5FD_MIRROR_XMIT_OPEN_SIZE == ret) { + H5FD_mirror_xmit_open_t xopen; + + mirror_log(run->loginfo, V_INFO, + "probable OPEN xmit received"); + + H5FD_mirror_xmit_decode_open(&xopen, (const unsigned char *)mybuf); + if (FALSE == H5FD_mirror_xmit_is_open(&xopen)) { + mirror_log(run->loginfo, V_WARN, + "expected OPEN xmit was malformed"); + HDclose(connfd); + continue; + } + + mirror_log(run->loginfo, V_INFO, + "probable OPEN xmit confirmed"); + + pid = HDfork(); + if (pid < 0) { /* fork error */ + mirror_log(run->loginfo, V_ERR, "cannot fork"); + goto error; + } /* end if fork error */ + else + if (pid == 0) { /* child process (writer side of fork) */ + mirror_log(run->loginfo, V_INFO, + "executing writer"); + if (run_writer(connfd, &xopen) < 0) { + HDprintf("can't run writer\n"); + } + else { + HDprintf("writer OK\n"); + } + HDclose(connfd); + + HDexit(EXIT_SUCCESS); + } /* end if writer side of fork */ + else { /* parent process (server side of fork) */ + mirror_log(run->loginfo, V_INFO, "tidying up from handshake"); + HDclose(connfd); + } /* end if server side of fork */ + + } /* end else-if valid request for service */ + else { + /* Ignore unrecognized messages */ + HDclose(connfd); + continue; + } /* end else (not a valid message, to be ignored) */ + + } /* end while listening for new connections */ + +done: + if (connfd >= 0) { + mirror_log(run->loginfo, V_WARN, "connfd still open upon cleanup"); + HDclose(connfd); + } + + return ret_value; + +error: + if (connfd >= 0) { + HDclose(connfd); + } + return -1; +} /* end handle_requests() */ + + +/* ------------------------------------------------------------------------- */ +int +main(int argc, char **argv) +{ + struct server_run *run; + + run = init_server_run(argc, argv); + if (NULL == run) { + mirror_log(NULL, V_ERR, "can't initialize run"); + HDexit(EXIT_FAILURE); + } + + if (handle_requests(run) < 0) { + mirror_log(run->loginfo, V_ERR, "problem handling requests"); + } + + if (term_server_run(run) < 0) { + mirror_log(NULL, V_ERR, "problem closing server run"); + HDexit(EXIT_FAILURE); + } + + HDexit(EXIT_SUCCESS); +} /* end main() */ + +#else /* H5_HAVE_MIRROR_VFD */ + +int +main(int argc, char **argv) +{ + HDprintf("Mirror VFD was not built -- cannot launch server.\n"); + HDexit(EXIT_FAILURE); +} + +#endif /* H5_HAVE_MIRROR_VFD */ + diff --git a/utils/mirror_vfd/mirror_server_halten_sie.c b/utils/mirror_vfd/mirror_server_halten_sie.c new file mode 100644 index 0000000..ccd5824 --- /dev/null +++ b/utils/mirror_vfd/mirror_server_halten_sie.c @@ -0,0 +1,214 @@ +/* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * + * Copyright by The HDF Group. * + * All rights reserved. * + * * + * This file is part of HDF5. The full HDF5 copyright notice, including * + * terms governing use, modification, and redistribution, is contained in * + * the COPYING file, which can be found at the root of the source code * + * distribution tree, or in https://support.hdfgroup.org/ftp/HDF5/releases. * + * If you do not have access to either file, you may request a copy from * + * help@hdfgroup.org. * + * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */ + +/* + * Purpose: Stop the mirror server + * Exists for cross-platform, optionally remote shutdown. + */ + +#include "H5private.h" /* System compatability call-wrapper macros */ + +#ifdef H5_HAVE_MIRROR_VFD + +#define MSHS_OPTS_MAGIC 0x613B1C15u /* sanity-checking constant */ +#define MSHS_IP_STR_SIZE 20 +#define MSHS_DEFAULT_IP "127.0.0.1" +#define MSHS_DEFAULT_PORTNO 3000 + + +/* ---------------------------------------------------------------------------- + * Structure: struct mshs_opts + * + * Purpose: Convenience structure to hold options as parsed from the + * command line. + * + * `magic` (uint32_t) + * Semi-unique constant to help verify pointer integrity. + * + * `help` (int) + * Flag that the help argument was present. + * + * `portno` (int) + * Port number, as received from arguments. + * + * `ip` (char *) + * IP address string as received from arguments. + * + * ---------------------------------------------------------------------------- + */ +struct mshs_opts { + uint32_t magic; + int help; + int portno; + char ip[MSHS_IP_STR_SIZE + 1]; +}; + + +/* ---------------------------------------------------------------------------- + * Function: usage + * + * Purpose: Print usage message to stdout. + * ---------------------------------------------------------------------------- + */ +static void +usage(void) +{ + HDprintf("mirror_server_halten_sie [options]\n" \ + "System-independent Mirror Server shutdown program.\n" \ + "Sends shutdown message to Mirror Server at given IP:port\n" \ + "\n" \ + "Options:\n" \ + " -h | --help Print this usage message and exit.\n" \ + " --ip=ADDR IP Address of remote server (defaut %s)\n" \ + " --port=PORT Handshake port of remote server (default %d)\n", + MSHS_DEFAULT_IP, + MSHS_DEFAULT_PORTNO); +} /* end usage() */ + + +/* ---------------------------------------------------------------------------- + * Function: parse_args + * + * Purpose: Parse command-line arguments, populating the options struct + * pointer as appropriate. + * Default values will be set for unspecified options. + * + * Return: 0 on success, negative (-1) if error. + * ---------------------------------------------------------------------------- + */ +static int +parse_args(int argc, char **argv, struct mshs_opts *opts) +{ + int i = 0; + + opts->magic = MSHS_OPTS_MAGIC; + opts->help = 0; + opts->portno = MSHS_DEFAULT_PORTNO; + HDstrncpy(opts->ip, MSHS_DEFAULT_IP, MSHS_IP_STR_SIZE); + + for (i=1; i < argc; i++) { /* start with first possible option argument */ + if (!HDstrncmp(argv[i], "-h", 3) || !HDstrncmp(argv[i], "--help", 7)) { + opts->help = 1; + } + else + if (!HDstrncmp(argv[i], "--ip=", 5)) { + HDstrncpy(opts->ip, argv[i]+5, MSHS_IP_STR_SIZE); + } + else + if (!HDstrncmp(argv[i], "--port=", 7)) { + opts->portno = HDatoi(argv[i]+7); + } + else { + HDprintf("Unrecognized option: '%s'\n", argv[i]); + usage(); + opts->magic++; /* invalidate for sanity */ + return -1; + } + } /* end for each argument from command line */ + + /* auto-replace 'localhost' with numeric IP */ + if (!HDstrncmp(opts->ip, "localhost", 10)) { /* include null terminator */ + HDstrncpy(opts->ip, "127.0.0.1", MSHS_IP_STR_SIZE); + } + + return 0; +} /* end parse_args() */ + + +/* ---------------------------------------------------------------------------- + * Function: send_shutdown + * + * Purpose: Create socket and send shutdown signal to remote server. + * + * Return: 0 on success, negative (-1) if error. + * ---------------------------------------------------------------------------- + */ +static int +send_shutdown(struct mshs_opts *opts) +{ + int live_socket; + struct sockaddr_in target_addr; + + if (opts->magic != MSHS_OPTS_MAGIC) { + HDprintf("invalid options structure\n"); + return -1; + } + + live_socket = HDsocket(AF_INET, SOCK_STREAM, 0); + if (live_socket < 0) { + HDprintf("ERROR socket()\n"); + return -1; + } + + target_addr.sin_family = AF_INET; + target_addr.sin_port = HDhtons((uint16_t)opts->portno); + target_addr.sin_addr.s_addr = HDinet_addr(opts->ip); + HDmemset(target_addr.sin_zero, '\0', sizeof(target_addr.sin_zero)); + + if (HDconnect(live_socket, (struct sockaddr *)&target_addr, + (socklen_t)sizeof(target_addr)) + < 0) + { + HDprintf("ERROR connect() (%d)\n%s\n", errno, HDstrerror(errno)); + return -1; + } + + if (HDwrite(live_socket, "SHUTDOWN", 9) == -1) { + HDprintf("ERROR write() (%d)\n%s\n", errno, HDstrerror(errno)); + return -1; + } + + if (HDclose(live_socket) < 0) { + HDprintf("ERROR close() can't close socket\n"); + return -1; + } + + return 0; +} /* end send_shutdown() */ + + +/* ------------------------------------------------------------------------- */ +int +main(int argc, char **argv) +{ + struct mshs_opts opts; + + if (parse_args(argc, argv, &opts) < 0) { + HDprintf("Unable to parse arguments\n"); + return EXIT_FAILURE; + } + + if (opts.help) { + usage(); + return EXIT_SUCCESS; + } + + if (send_shutdown(&opts) < 0) { + HDprintf("Unable to send shutdown command\n"); + return EXIT_FAILURE; + } + + return EXIT_SUCCESS; +} /* end main() */ + +#else /* H5_HAVE_MIRROR_VFD */ + + +/* ------------------------------------------------------------------------- */ +int +main(int argc, char **argv) +{ + HDprintf("Mirror VFD not built -- unable to perform shutdown.\n"); + return EXIT_FAILURE; +} + +#endif /* H5_HAVE_MIRROR_VFD */ diff --git a/utils/mirror_vfd/mirror_writer.c b/utils/mirror_vfd/mirror_writer.c new file mode 100644 index 0000000..9cd9b4c --- /dev/null +++ b/utils/mirror_vfd/mirror_writer.c @@ -0,0 +1,1064 @@ +/* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * + * Copyright by The HDF Group. * + * All rights reserved. * + * * + * This file is part of HDF5. The full HDF5 copyright notice, including * + * terms governing use, modification, and redistribution, is contained in * + * the COPYING file, which can be found at the root of the source code * + * distribution tree, or in https://support.hdfgroup.org/ftp/HDF5/releases. * + * If you do not have access to either file, you may request a copy from * + * help@hdfgroup.org. * + * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */ + +/* + * Remote writer process for the mirror (socket) VFD. + * + * Writer is started with arguments for slaved port. + * Awaits a connection on the socket. + * Handles instructions from the master 'Driver' process. + * + * Current implementation uses Sec2 as the underlying driver when opening a + * file. This is reflected in the source (H5FDmirror.c) of the Mirror driver. + */ + +#include "mirror_remote.h" + +#ifdef H5_HAVE_MIRROR_VFD + +#define HEXDUMP_XMITS 1 /* Toggle whether to print xmit bytes-blob */ + /* in detailed logging */ +#define HEXDUMP_WRITEDATA 0 /* Toggle whether to print bytes to write */ + /* in detailed logging */ +#define LISTENQ 80 /* max pending Driver requests */ + +#define MW_SESSION_MAGIC 0x88F36B32u +#define MW_SOCK_COMM_MAGIC 0xDF10A157u +#define MW_OPTS_MAGIC 0x3BA8B462u + + +/* --------------------------------------------------------------------------- + * Structure: struct mirror_session + * + * Bundle of information used to manage the operation of this remote Writer + * in a "session" with the Driver process. + * + * magic (uint32_t) + * Semi-unique "magic" number used to sanity-check a structure for + * validity. MUST equal MW_SESSION_MAGIC to be valid. + * + * sockfd (int) + * File descriptor to the socket. + * Used for receiving bytes from and writing bytes to the Driver + * across the network. + * If not NULL, should be a valid descriptor. + * + * token (uint32t) + * Number used to help sanity-check received transmission from the Writer. + * Each Driver/Writer pairing should have a semi-unique "token" to help + * guard against commands from the wrong entity. + * + * xmit_count (uint32_t) + * Record of trasmissions received from the Driver. While the transmission + * protocol should be trustworthy, this serves as an additional guard. + * Starts a 0 and should be incremented for each one-way transmission. + * + * file (H5FD_t *) + * Virtual File handle for the hdf5 file. + * Set on file open if H5Fopen() is successful. If NULL, it is invalid. + * + * log_verbosity (unsigned int) + * The verbosity level for logging. Should be set to one of the values + * defined at the top of this file. + * + * log_stream (FILE *) + * File pointer to which logging output is written. Starts (and ends) + * with a default stream, such as stdout, but can be overridden at + * runtime. + * + * reply (H5FD_mirror_xmit_reply_t) + * Structure space for persistent reply data. + * Should be initialized with basic header info (magic, version, op), + * then with session info (token, xmit count), and finally with specific + * reply info (update xmit_count, status code, and message) before + * transmission. + * + * ---------------------------------------------------------------------------- + */ +struct mirror_session { + uint32_t magic; + int sockfd; + uint32_t token; + uint32_t xmit_count; + H5FD_t *file; + loginfo_t *loginfo; + H5FD_mirror_xmit_reply_t reply; +}; + + +/* --------------------------------------------------------------------------- + * Structure: struct sock_comm + * + * Structure for placing the data read and pre-processed from Driver in an + * organized fashion. Useful for pre-processing a received xmit. + * + * magic (uint32_t) + * Semi-unique number to sanity-check structure pointer and validity. + * Must equal MW_SOCK_COMM_MAGIC to be valid. + * + * recd_die (int) + * "Boolean" flag indicating that an explicit shutdown/kill/die command + * was received. Potentially useful for debugging and or "manual" + * operation of the program. + * 0 indicates normal operation, non-0 (1) indicates to die. + * + * xmit_recd (H5FD_mirror_xmit_t *) + * Structure pointer for the "xmit header" as decoded from the raw + * binary stream read from the socket. + * + * raw (char *) + * Pointer to a raw byte array, for storing data as read from the + * socket. Bytes buffer is decoded into xmit_t header and derivative + * structures. + * + * raw_size (size_t) + * Give the size of the `raw` buffer. + * + * --------------------------------------------------------------------------- + */ +struct sock_comm { + uint32_t magic; + int recd_die; + H5FD_mirror_xmit_t *xmit_recd; + char *raw; + size_t raw_size; +}; + + +/* --------------------------------------------------------------------------- + * Structure: struct mirror_writer_opts + * + * Container for default values and options as parsed from the command line. + * Currently rather vestigal, but may be expanded and/or moved to be set by + * Server and passed around as an argument. + * + * magic (uint32_t) + * Semi-unique number to sanity-check structure pointer and validity. + * Must equal MW_OPTS_MAGIC to be valid. + * + * logpath (char *) + * String pointer. Allocated at runtime. + * Specifies file location for logging output. + * May be NULL -- uses default output (e.g., stdout). + * + * ---------------------------------------------------------------------------- + */ +struct mirror_writer_opts { + uint32_t magic; + char *logpath; +}; + +static int do_open(struct mirror_session *session, + const H5FD_mirror_xmit_open_t *xmit_open); + + +/* --------------------------------------------------------------------------- + * Function: session_init + * + * Purpose: Populate mirror_session structure with default and + * options-drived values. + * + * Return: An allocated mirror_session structure pointer on success, + * else NULL. + * ---------------------------------------------------------------------------- + */ +static struct mirror_session * +session_init(struct mirror_writer_opts *opts) +{ + struct mirror_session *session = NULL; + + mirror_log(NULL, V_INFO, "session_init()"); + + if (NULL == opts || opts->magic != MW_OPTS_MAGIC) { + mirror_log(NULL, V_ERR, "invalid opts pointer"); + goto error; + } + + session = (struct mirror_session *)HDmalloc(sizeof(struct mirror_session)); + if (session == NULL) { + mirror_log(NULL, V_ERR, "can't allocate session structure"); + goto error; + } + + session->magic = MW_SESSION_MAGIC; + session->sockfd = -1; + session->xmit_count = 0; + session->token = 0; + session->file = NULL; + + session->reply.pub.magic = H5FD_MIRROR_XMIT_MAGIC; + session->reply.pub.version = H5FD_MIRROR_XMIT_CURR_VERSION; + session->reply.pub.op = H5FD_MIRROR_OP_REPLY; + session->reply.pub.session_token = 0; + HDbzero(session->reply.message, H5FD_MIRROR_STATUS_MESSAGE_MAX); + + /* Options-derived population + */ + + session->loginfo = mirror_log_init(opts->logpath, "W- ", + MIRROR_LOG_DEFAULT_VERBOSITY); + + return session; + +error: + if (session) { + HDfree(session); + } + return NULL; +} /* end session_init() */ + + +/* --------------------------------------------------------------------------- + * Function: session_stop + * + * Purpose: Stop and clean up a session. + * Only do this as part of program termination or aborting startup. + * + * Return: 0 on success, or negative sum of errors encountered. + * ---------------------------------------------------------------------------- + */ +static int +session_stop(struct mirror_session *session) +{ + int ret_value = 0; + + HDassert(session && (session->magic == MW_SESSION_MAGIC)); + + mirror_log(session->loginfo, V_INFO, "session_stop()"); + + /* Close HDF5 file if it is still open (probably in error) */ + if (session->file) { + mirror_log(session->loginfo, V_WARN, "HDF5 file still open at cleanup"); + if (H5FDclose(session->file) < 0) { + mirror_log(session->loginfo, V_ERR, "H5FDclose() during cleanup!"); + ret_value--; + } + } + + /* Socket will be closed by parent side of server fork after exit */ + + /* Close custom logging stream */ + if (mirror_log_term(session->loginfo) < 0) { + mirror_log(NULL, V_ERR, "Problem closing logging stream"); + ret_value--; + } + session->loginfo = NULL; + + /* Invalidate and release structure */ + session->magic++; + HDfree(session); + + return ret_value; +} /* end session_stop() */ + + +/* --------------------------------------------------------------------------- + * Function: session_start + * + * Purpose: Initiate session, open files. + * + * Return: Success: A valid mirror_session pointer which must later be + * cleaned up with session_stop(). + * Failure: NULL, after cleaning up after itself. + * --------------------------------------------------------------------------- + */ +static struct mirror_session * +session_start(int socketfd, const H5FD_mirror_xmit_open_t *xmit_open) +{ + struct mirror_session *session = NULL; + struct mirror_writer_opts opts; +#if 0 /* TODO: behaviro option */ + char logpath[H5FD_MIRROR_XMIT_FILEPATH_MAX] = ""; +#endif + + mirror_log(NULL, V_INFO, "session_start()"); + + if (FALSE == H5FD_mirror_xmit_is_open(xmit_open)) { + mirror_log(NULL, V_ERR, "invalid OPEN xmit"); + return NULL; + } + + opts.magic = MW_OPTS_MAGIC; +#if 0 /* TODO: behavior option */ + HDsnprintf(logpath, H5FD_MIRROR_XMIT_FILEPATH_MAX, "%s.log", + xmit_open->filename); + opts.logpath = logpath; +#else + opts.logpath = NULL; +#endif + + session = session_init(&opts); + if (NULL == session) { + mirror_log(NULL, V_ERR, "can't instantiate session"); + goto error; + } + + session->sockfd = socketfd; + + if (do_open(session, xmit_open) < 0) { + mirror_log(NULL, V_ERR, "unable to open file"); + goto error; + } + + return session; + +error: + if (session != NULL) { + if (session_stop(session) < 0) { + mirror_log(NULL, V_WARN, "Can't abort session init"); + } + session = NULL; + } + return NULL; +} + + +/* --------------------------------------------------------------------------- + * Function: _xmit_reply + * + * Purpose: Common operations to send a reply xmit through the session. + * + * Return: 0 on success, -1 if error. + * ---------------------------------------------------------------------------- + */ +static int +_xmit_reply(struct mirror_session *session) +{ + unsigned char xmit_buf[H5FD_MIRROR_XMIT_REPLY_SIZE]; + H5FD_mirror_xmit_reply_t *reply = &(session->reply); + + HDassert(session && (session->magic == MW_SESSION_MAGIC)); + + mirror_log(session->loginfo, V_ALL, "_xmit_reply()"); + + reply->pub.xmit_count = session->xmit_count++; + if (H5FD_mirror_xmit_encode_reply(xmit_buf, + (const H5FD_mirror_xmit_reply_t *)reply) + != H5FD_MIRROR_XMIT_REPLY_SIZE) + { + mirror_log(session->loginfo, V_ERR, "can't encode reply"); + return -1; + } + + mirror_log(session->loginfo, V_ALL, "reply xmit data\n```"); + mirror_log_bytes(session->loginfo, V_ALL, H5FD_MIRROR_XMIT_REPLY_SIZE, + (const unsigned char *)xmit_buf); + mirror_log(session->loginfo, V_ALL, "```"); + + if (HDwrite(session->sockfd, xmit_buf, H5FD_MIRROR_XMIT_REPLY_SIZE) < 0) { + mirror_log(session->loginfo, V_ERR, "can't write reply to Driver"); + return -1; + } + + return 0; +} /* end _xmit_reply() */ + + +/* --------------------------------------------------------------------------- + * Function: reply_ok + * + * Purpose: Send an OK reply through the session. + * + * Return: 0 on success, -1 if error. + * --------------------------------------------------------------------------- + */ +static int +reply_ok(struct mirror_session *session) +{ + H5FD_mirror_xmit_reply_t *reply = &(session->reply); + + HDassert(session && (session->magic == MW_SESSION_MAGIC)); + + mirror_log(session->loginfo, V_ALL, "reply_ok()"); + + reply->status = H5FD_MIRROR_STATUS_OK; + HDbzero(reply->message, H5FD_MIRROR_STATUS_MESSAGE_MAX); + return _xmit_reply(session); +} /* end reply_ok() */ + + +/* --------------------------------------------------------------------------- + * Function: reply_error + * + * Purpose: Send an ERROR reply with message through the session. + * Message may be cut short if it would overflow the available + * buffer in the xmit. + * + * Return: 0 on success, -1 if error. + * --------------------------------------------------------------------------- + */ +static int +reply_error(struct mirror_session *session, + const char *msg) +{ + H5FD_mirror_xmit_reply_t *reply = &(session->reply); + + HDassert(session && (session->magic == MW_SESSION_MAGIC)); + + mirror_log(session->loginfo, V_ALL, "reply_error(%s)", msg); + + reply->status = H5FD_MIRROR_STATUS_ERROR; + HDsnprintf(reply->message, H5FD_MIRROR_STATUS_MESSAGE_MAX-1, "%s", msg); + return _xmit_reply(session); +} /* end reply_error() */ + + +/* --------------------------------------------------------------------------- + * Function: do_close + * + * Purpose: Handle an CLOSE operation. + * + * Return: 0 on success, -1 if error. + * --------------------------------------------------------------------------- + */ +static int +do_close(struct mirror_session *session) +{ + + HDassert(session && (session->magic == MW_SESSION_MAGIC)); + + mirror_log(session->loginfo, V_INFO, "do_close()"); + + if (NULL == session->file) { + mirror_log(session->loginfo, V_ERR, "no file to close!"); + reply_error(session, "no file to close"); + return -1; + } + + if (H5FDclose(session->file) < 0) { + mirror_log(session->loginfo, V_ERR, "H5FDclose()"); + reply_error(session, "H5FDclose()"); + return -1; + } + session->file = NULL; + + if (reply_ok(session) < 0) { + mirror_log(session->loginfo, V_ERR, "can't reply"); + reply_error(session, "ok reply failed; session contaminated"); + return -1; + } + + return 0; +} /* end do_close() */ + + +/* --------------------------------------------------------------------------- + * Function: do_lock + * + * Purpose: Handle a LOCK operation. + * + * Return: 0 on success, -1 if error. + * --------------------------------------------------------------------------- + */ +static int +do_lock(struct mirror_session *session, + const unsigned char *xmit_buf) +{ + size_t decode_ret = 0; + H5FD_mirror_xmit_lock_t xmit_lock; + + HDassert(session && (session->magic == MW_SESSION_MAGIC) && xmit_buf); + + mirror_log(session->loginfo, V_INFO, "do_lock()"); + + decode_ret = H5FD_mirror_xmit_decode_lock(&xmit_lock, xmit_buf); + if (H5FD_MIRROR_XMIT_LOCK_SIZE != decode_ret) { + mirror_log(session->loginfo, V_ERR, "can't decode set-eoa xmit"); + reply_error(session, "remote xmit_eoa_t decoding size failure"); + return -1; + } + + if (!H5FD_mirror_xmit_is_lock(&xmit_lock)) { + mirror_log(session->loginfo, V_ERR, "not a set-eoa xmit"); + reply_error(session, "remote xmit_eoa_t decode failure"); + return -1; + } + mirror_log(session->loginfo, V_INFO, "lock rw: (%d)", xmit_lock.rw); + + if (H5FDlock(session->file, (hbool_t)xmit_lock.rw) < 0) { + mirror_log(session->loginfo, V_ERR, "H5FDlock()"); + reply_error(session, "remote H5FDlock() failure"); + return -1; + } + + if (reply_ok(session) < 0) { + mirror_log(session->loginfo, V_ERR, "can't reply"); + reply_error(session, "ok reply failed; session contaminated"); + return -1; + } + + return 0; +} /* end do_lock() */ + + +/* --------------------------------------------------------------------------- + * Function: do_open + * + * Purpose: Handle an OPEN operation. + * + * Return: 0 on success, -1 if error. + * --------------------------------------------------------------------------- + */ +static int +do_open(struct mirror_session *session, + const H5FD_mirror_xmit_open_t *xmit_open) +{ + hid_t fapl_id = H5I_INVALID_HID; + unsigned _flags = 0; + haddr_t _maxaddr = HADDR_UNDEF; + + HDassert(session && (session->magic == MW_SESSION_MAGIC) && xmit_open && + TRUE == H5FD_mirror_xmit_is_open(xmit_open)); + + mirror_log(session->loginfo, V_INFO, "do_open()"); + + if (0 != xmit_open->pub.xmit_count) { + mirror_log(session->loginfo, V_ERR, "open with xmit count not zero!"); + reply_error(session, "initial transmission count not zero"); + goto error; + } + if (0 != session->token) { + mirror_log(session->loginfo, V_ERR, "open with token already set!"); + reply_error(session, "initial session token not zero"); + goto error; + } + + session->xmit_count = 1; + session->token = xmit_open->pub.session_token; + session->reply.pub.session_token = session->token; + + _flags = (unsigned)xmit_open->flags; + _maxaddr = (haddr_t)xmit_open->maxaddr; + + /* Check whether the native size_t on the remote machine (Driver) is larger + * than that on the local machine; if so, issue a warning. + * The blob is always an 8-byte bitfield -- check its contents. + */ + if (xmit_open->size_t_blob > (uint64_t)((size_t)(-1))) { + mirror_log(session->loginfo, V_WARN, + "Driver size_t is larger than our own"); + } + + mirror_log(session->loginfo, V_INFO, + "to open file %s (flags %d) (maxaddr %d)", + xmit_open->filename, _flags, _maxaddr); + + /* Explicitly use Sec2 as the underlying driver for now. + */ + fapl_id = H5Pcreate(H5P_FILE_ACCESS); + if (fapl_id < 0) { + mirror_log(session->loginfo, V_ERR, "can't create FAPL"); + reply_error(session, "H5Pcreate() failure"); + goto error; + } + if (H5Pset_fapl_sec2(fapl_id) < 0) { + mirror_log(session->loginfo, V_ERR, "can't set FAPL as Sec2"); + reply_error(session, "H5Pset_fapl_sec2() failure"); + goto error; + } + + session->file = H5FDopen(xmit_open->filename, _flags, fapl_id, _maxaddr); + if (NULL == session->file) { + mirror_log(session->loginfo, V_ERR, "H5FDopen()"); + reply_error(session, "remote H5FDopen() failure"); + goto error; + } + + /* FAPL is set and in use; clean up */ + if (H5Pclose(fapl_id) < 0) { + mirror_log(session->loginfo, V_ERR, "can't set FAPL as Sec2"); + reply_error(session, "H5Pset_fapl_sec2() failure"); + goto error; + } + + if (reply_ok(session) < 0) { + mirror_log(session->loginfo, V_ERR, "can't reply"); + reply_error(session, "ok reply failed; session contaminated"); + return -1; + } + + return 0; + +error: + if (fapl_id > 0) { + H5E_BEGIN_TRY { + (void)H5Pclose(fapl_id); + } H5E_END_TRY; + } + return -1; +} /* end do_open() */ + + +/* --------------------------------------------------------------------------- + * Function: do_set_eoa + * + * Purpose: Handle a SET_EOA operation. + * + * Return: 0 on success, -1 if error. + * --------------------------------------------------------------------------- + */ +static int +do_set_eoa(struct mirror_session *session, + const unsigned char *xmit_buf) +{ + size_t decode_ret = 0; + H5FD_mirror_xmit_eoa_t xmit_seoa; + + HDassert(session && (session->magic == MW_SESSION_MAGIC) && xmit_buf); + + mirror_log(session->loginfo, V_INFO, "do_set_eoa()"); + + decode_ret = H5FD_mirror_xmit_decode_set_eoa(&xmit_seoa, xmit_buf); + if (H5FD_MIRROR_XMIT_EOA_SIZE != decode_ret) { + mirror_log(session->loginfo, V_ERR, "can't decode set-eoa xmit"); + reply_error(session, "remote xmit_eoa_t decoding size failure"); + return -1; + } + + if (!H5FD_mirror_xmit_is_set_eoa(&xmit_seoa)) { + mirror_log(session->loginfo, V_ERR, "not a set-eoa xmit"); + reply_error(session, "remote xmit_eoa_t decode failure"); + return -1; + } + + mirror_log(session->loginfo, V_INFO, "set EOA addr %d", + xmit_seoa.eoa_addr); + + if (H5FDset_eoa(session->file, (H5FD_mem_t)xmit_seoa.type, + (haddr_t)xmit_seoa.eoa_addr) + < 0) + { + mirror_log(session->loginfo, V_ERR, "H5FDset_eoa()"); + reply_error(session, "remote H5FDset_eoa() failure"); + return -1; + } + + if (reply_ok(session) < 0) { + mirror_log(session->loginfo, V_ERR, "can't reply"); + reply_error(session, "ok reply failed; session contaminated"); + return -1; + } + + return 0; +} /* end do_set_eoa() */ + + +/* --------------------------------------------------------------------------- + * Function: do_truncate + * + * Purpose: Handle a TRUNCATE operation. + * + * Return: 0 on success, -1 if error. + * --------------------------------------------------------------------------- + */ +static int +do_truncate(struct mirror_session *session) +{ + + HDassert(session && (session->magic == MW_SESSION_MAGIC)); + + mirror_log(session->loginfo, V_INFO, "do_truncate()"); + + /* default DXPL ID (0), 0 for "FALSE" closing -- both probably unused */ + if (H5FDtruncate(session->file, 0, 0) < 0) { + mirror_log(session->loginfo, V_ERR, "H5FDtruncate()"); + reply_error(session, "remote H5FDtruncate() failure"); + return -1; + } + + if (reply_ok(session) < 0) { + mirror_log(session->loginfo, V_ERR, "can't reply"); + reply_error(session, "ok reply failed; session contaminated"); + return -1; + } + + return 0; +} /* end do_truncate() */ + + +/* --------------------------------------------------------------------------- + * Function: do_unlock + * + * Purpose: Handle an UNLOCK operation. + * + * Return: 0 on success, -1 if error. + * --------------------------------------------------------------------------- + */ +static int +do_unlock(struct mirror_session *session) +{ + HDassert(session && (session->magic == MW_SESSION_MAGIC)); + + mirror_log(session->loginfo, V_INFO, "do_unlock()"); + + if (H5FDunlock(session->file) < 0) { + mirror_log(session->loginfo, V_ERR, "H5FDunlock()"); + reply_error(session, "remote H5FDunlock() failure"); + return -1; + } + + if (reply_ok(session) < 0) { + mirror_log(session->loginfo, V_ERR, "can't reply"); + reply_error(session, "ok reply failed; session contaminated"); + return -1; + } + + return 0; +} /* end do_unlock() */ + + +/* --------------------------------------------------------------------------- + * Function: do_write + * + * Purpose: Handle a WRITE operation. + * Receives command, replies; receives & writes data, replies. + * + * It is known that this results in suboptimal performance, + * but handling both small and very, very large write buffers + * with a single "over the wire" exchange + * poses design challenges not worth tackling as of March 2020. + * + * Return: 0 on success, -1 if error. + * --------------------------------------------------------------------------- + */ +static int +do_write(struct mirror_session *session, + const unsigned char *xmit_buf) +{ + size_t decode_ret = 0; + haddr_t addr = 0; + haddr_t sum_bytes_written = 0; + H5FD_mem_t type = 0; + char *buf = NULL; + ssize_t nbytes_in_packet = 0; + H5FD_mirror_xmit_write_t xmit_write; + + HDassert(session && (session->magic == MW_SESSION_MAGIC) && xmit_buf); + + mirror_log(session->loginfo, V_INFO, "do_write()"); + + if (NULL == session->file) { + mirror_log(session->loginfo, V_ERR, "no open file!"); + reply_error(session, "no file open on remote"); + return -1; + } + + decode_ret = H5FD_mirror_xmit_decode_write(&xmit_write, xmit_buf); + if (H5FD_MIRROR_XMIT_WRITE_SIZE != decode_ret) { + mirror_log(session->loginfo, V_ERR, "can't decode write xmit"); + reply_error(session, "remote xmit_write_t decoding size failure"); + return -1; + } + + if (!H5FD_mirror_xmit_is_write(&xmit_write)) { + mirror_log(session->loginfo, V_ERR, "not a write xmit"); + reply_error(session, "remote xmit_write_t decode failure"); + return -1; + } + + addr = (haddr_t)xmit_write.offset; + type = (H5FD_mem_t)xmit_write.type; + + /* Allocate the buffer once -- re-use between loops. + */ + buf = (char *)HDmalloc(sizeof(char) * H5FD_MIRROR_DATA_BUFFER_MAX); + if (NULL == buf) { + mirror_log(session->loginfo, V_ERR, "can't allocate databuffer"); + reply_error(session, "can't allocate buffer for receiving data"); + return -1; + } + + /* got write signal; ready for data */ + if (reply_ok(session) < 0) { + mirror_log(session->loginfo, V_ERR, "can't reply"); + reply_error(session, "ok reply failed; session contaminated"); + return -1; + } + + mirror_log(session->loginfo, V_INFO, "to write %zu bytes at %zu", + xmit_write.size, + addr); + + /* The given write may be: + * 1. larger than the allowed single buffer size + * 2. larger than the native size_t of this system + * + * Handle all cases by looping, ingesting as much of the stream as possible + * and writing that part to the file. + */ + sum_bytes_written = 0; + do { + nbytes_in_packet = HDread(session->sockfd, buf, + H5FD_MIRROR_DATA_BUFFER_MAX); + if (-1 == nbytes_in_packet) { + mirror_log(session->loginfo, V_ERR, "can't read into databuffer"); + reply_error(session, "can't read data buffer"); + return -1; + } + + mirror_log(session->loginfo, V_INFO, "received %zd bytes", + nbytes_in_packet); + if (HEXDUMP_WRITEDATA) { + mirror_log(session->loginfo, V_ALL, "DATA:\n```"); + mirror_log_bytes(session->loginfo, V_ALL, nbytes_in_packet, + (const unsigned char *)buf); + mirror_log(session->loginfo, V_ALL, "```"); + } + + mirror_log(session->loginfo, V_INFO, "writing %zd bytes at %zu", + nbytes_in_packet, + (addr + sum_bytes_written)); + + if (H5FDwrite(session->file, type, H5P_DEFAULT, + (addr + sum_bytes_written), (size_t)nbytes_in_packet, buf) + < 0) + { + mirror_log(session->loginfo, V_ERR, "H5FDwrite()"); + reply_error(session, "remote H5FDwrite() failure"); + return -1; + } + + sum_bytes_written += (haddr_t)nbytes_in_packet; + + } while (sum_bytes_written < xmit_write.size); /* end while ingesting */ + + HDfree(buf); + + /* signal that we're done here and a-ok */ + if (reply_ok(session) < 0) { + mirror_log(session->loginfo, V_ERR, "can't reply"); + reply_error(session, "ok reply failed; session contaminated"); + return -1; + } + + return 0; +} /* end do_write() */ + + +/* --------------------------------------------------------------------------- + * Function: receive_communique + * + * Purpose: Accept bytes from the socket, check for emergency shutdown, and + * sanity-check received bytes. + * The raw bytes read are stored in the sock_comm structure at + * comm->raw. + * The raw bytes are decoded and a xmit_t (header) struct pointer + * in comm is populated at comm->xmit_recd. + * + * Return: 0 on success, -1 if error. + * --------------------------------------------------------------------------- + */ +static int +receive_communique( + struct mirror_session *session, + struct sock_comm *comm) +{ + ssize_t read_ret = 0; + size_t decode_ret; + H5FD_mirror_xmit_t *X = comm->xmit_recd; + + HDassert((session != NULL) && \ + (session->magic == MW_SESSION_MAGIC) && \ + (comm != NULL) && \ + (comm->magic == MW_SOCK_COMM_MAGIC) && \ + (comm->xmit_recd != NULL) && \ + (comm->raw != NULL) && \ + (comm->raw_size >= H5FD_MIRROR_XMIT_BUFFER_MAX) ); + + mirror_log(session->loginfo, V_INFO, "receive_communique()"); + + HDbzero(comm->raw, comm->raw_size); + comm->recd_die = 0; + + mirror_log(session->loginfo, V_INFO, "ready to receive"); /* TODO */ + + read_ret = HDread(session->sockfd, comm->raw, H5FD_MIRROR_XMIT_BUFFER_MAX); + if (-1 == read_ret) { + mirror_log(session->loginfo, V_ERR, "read:%zd", read_ret); + goto error; + } + + mirror_log(session->loginfo, V_INFO, "received %zd bytes", read_ret); + if (HEXDUMP_XMITS) { + mirror_log(session->loginfo, V_ALL, "```", read_ret); + mirror_log_bytes(session->loginfo, V_ALL, (size_t)read_ret, + (const unsigned char *)comm->raw); + mirror_log(session->loginfo, V_ALL, "```"); + } /* end if hexdump transmissions received */ + + /* old-fashioned manual kill (for debugging) */ + if (!HDstrncmp("GOODBYE", comm->raw, 7)) { + mirror_log(session->loginfo, V_INFO, "received GOODBYE"); + comm->recd_die = 1; + goto done; + } + + decode_ret = H5FD_mirror_xmit_decode_header(X, + (const unsigned char *)comm->raw); + if (H5FD_MIRROR_XMIT_HEADER_SIZE != decode_ret) { + mirror_log(session->loginfo, V_ERR, + "header decode size mismatch: expected (%z), got (%z)", + H5FD_MIRROR_XMIT_HEADER_SIZE, decode_ret); + /* Try to tell Driver that it should stop */ + reply_error(session, "xmit size mismatch"); + goto error; + } + + if (!H5FD_mirror_xmit_is_xmit(X)) { + mirror_log(session->loginfo, V_ERR, "bad magic: 0x%X", X->magic); + /* Try to tell Driver that it should stop */ + reply_error(session, "bad magic"); + goto error; + } + + if (session->xmit_count != X->xmit_count) { + mirror_log(session->loginfo, V_ERR, + "xmit_count mismatch exp:%d recd:%d", + session->xmit_count, X->xmit_count); + /* Try to tell Driver that it should stop */ + reply_error(session, "xmit_count mismatch"); + goto error; + } + + if ( (session->token > 0) && (session->token != X->session_token) ) { + mirror_log(session->loginfo, V_ERR, "wrong session"); + /* Try to tell Driver that it should stop */ + reply_error(session, "wrong session"); + goto error; + } + + session->xmit_count++; + +done: + return 0; + +error: + return -1; +} /* end receive_communique() */ + + +/* --------------------------------------------------------------------------- + * Function: process_instructions + * + * Purpose: Receive and handle all instructions from Driver. + * + * Return: 0 on success, -1 if error. + * --------------------------------------------------------------------------- + */ +static int +process_instructions(struct mirror_session *session) +{ + struct sock_comm comm; + char xmit_buf[H5FD_MIRROR_XMIT_BUFFER_MAX]; /* raw bytes */ + H5FD_mirror_xmit_t xmit_recd; /* for decoded xmit header */ + + HDassert(session && (session->magic == MW_SESSION_MAGIC)); + + mirror_log(session->loginfo, V_INFO, "process_instructions()"); + + comm.magic = MW_SOCK_COMM_MAGIC; + comm.recd_die = 0; /* Flag for program to terminate */ + comm.xmit_recd = &xmit_recd; + comm.raw = xmit_buf; + comm.raw_size = sizeof(xmit_buf); + + while (1) { /* sill-listening infinite loop */ + + /* Use convenience structure for raw/decoded info in/out */ + if (receive_communique(session, &comm) < 0) { + mirror_log(session->loginfo, V_ERR, "problem reading socket"); + return -1; + } + + if (comm.recd_die) { + goto done; + } + + switch(xmit_recd.op) { + case H5FD_MIRROR_OP_CLOSE: + if (do_close(session) < 0) { + return -1; + } + goto done; + case H5FD_MIRROR_OP_LOCK: + if (do_lock(session, (const unsigned char *)xmit_buf) < 0) { + return -1; + } + break; + case H5FD_MIRROR_OP_OPEN: + mirror_log(session->loginfo, V_ERR, "OPEN xmit during session"); + reply_error(session, "illegal OPEN xmit during session"); + return -1; + case H5FD_MIRROR_OP_SET_EOA: + if (do_set_eoa(session, (const unsigned char *)xmit_buf) < 0) { + return -1; + } + break; + case H5FD_MIRROR_OP_TRUNCATE: + if (do_truncate(session) < 0) { + return -1; + } + break; + case H5FD_MIRROR_OP_UNLOCK: + if (do_unlock(session) < 0) { + return -1; + } + break; + case H5FD_MIRROR_OP_WRITE: + if (do_write(session, (const unsigned char *)xmit_buf) < 0) { + return -1; + } + break; + default: + mirror_log(session->loginfo, V_ERR, "unrecognized transmission"); + reply_error(session, "unrecognized transmission"); + return -1; + } /* end switch (xmit_recd.op) */ + + } /* end while still listening */ + +done: + comm.magic = 0; /* invalidate structure, on principle */ + xmit_recd.magic = 0; /* invalidate structure, on principle */ + return 0; +} /* end process_instructions() */ + + +/* ------------------------------------------------------------------------- */ +herr_t +run_writer(int socketfd, H5FD_mirror_xmit_open_t *xmit_open) +{ + struct mirror_session *session = NULL; + int ret_value = SUCCEED; + + session = session_start(socketfd, xmit_open); + if (NULL == session) { + mirror_log(NULL, V_ERR, "Can't start session -- aborting"); + ret_value = FAIL; + } + else { + if (process_instructions(session) < 0) { + mirror_log(session->loginfo, V_ERR, + "problem processing instructions"); + ret_value = FAIL; + } + if (session_stop(session) < 0) { + mirror_log(NULL, V_ERR, "Can't stop session -- going down hard"); + ret_value = FAIL; + } + } + + return ret_value; +} /* end run_writer */ + +#endif /* H5_HAVE_MIRROR_VFD */ + |