diff options
Diffstat (limited to 'utils/mirror_vfd/mirror_writer.c')
-rw-r--r-- | utils/mirror_vfd/mirror_writer.c | 1064 |
1 files changed, 1064 insertions, 0 deletions
diff --git a/utils/mirror_vfd/mirror_writer.c b/utils/mirror_vfd/mirror_writer.c new file mode 100644 index 0000000..9cd9b4c --- /dev/null +++ b/utils/mirror_vfd/mirror_writer.c @@ -0,0 +1,1064 @@ +/* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * + * Copyright by The HDF Group. * + * All rights reserved. * + * * + * This file is part of HDF5. The full HDF5 copyright notice, including * + * terms governing use, modification, and redistribution, is contained in * + * the COPYING file, which can be found at the root of the source code * + * distribution tree, or in https://support.hdfgroup.org/ftp/HDF5/releases. * + * If you do not have access to either file, you may request a copy from * + * help@hdfgroup.org. * + * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */ + +/* + * Remote writer process for the mirror (socket) VFD. + * + * Writer is started with arguments for slaved port. + * Awaits a connection on the socket. + * Handles instructions from the master 'Driver' process. + * + * Current implementation uses Sec2 as the underlying driver when opening a + * file. This is reflected in the source (H5FDmirror.c) of the Mirror driver. + */ + +#include "mirror_remote.h" + +#ifdef H5_HAVE_MIRROR_VFD + +#define HEXDUMP_XMITS 1 /* Toggle whether to print xmit bytes-blob */ + /* in detailed logging */ +#define HEXDUMP_WRITEDATA 0 /* Toggle whether to print bytes to write */ + /* in detailed logging */ +#define LISTENQ 80 /* max pending Driver requests */ + +#define MW_SESSION_MAGIC 0x88F36B32u +#define MW_SOCK_COMM_MAGIC 0xDF10A157u +#define MW_OPTS_MAGIC 0x3BA8B462u + + +/* --------------------------------------------------------------------------- + * Structure: struct mirror_session + * + * Bundle of information used to manage the operation of this remote Writer + * in a "session" with the Driver process. + * + * magic (uint32_t) + * Semi-unique "magic" number used to sanity-check a structure for + * validity. MUST equal MW_SESSION_MAGIC to be valid. + * + * sockfd (int) + * File descriptor to the socket. + * Used for receiving bytes from and writing bytes to the Driver + * across the network. + * If not NULL, should be a valid descriptor. + * + * token (uint32t) + * Number used to help sanity-check received transmission from the Writer. + * Each Driver/Writer pairing should have a semi-unique "token" to help + * guard against commands from the wrong entity. + * + * xmit_count (uint32_t) + * Record of trasmissions received from the Driver. While the transmission + * protocol should be trustworthy, this serves as an additional guard. + * Starts a 0 and should be incremented for each one-way transmission. + * + * file (H5FD_t *) + * Virtual File handle for the hdf5 file. + * Set on file open if H5Fopen() is successful. If NULL, it is invalid. + * + * log_verbosity (unsigned int) + * The verbosity level for logging. Should be set to one of the values + * defined at the top of this file. + * + * log_stream (FILE *) + * File pointer to which logging output is written. Starts (and ends) + * with a default stream, such as stdout, but can be overridden at + * runtime. + * + * reply (H5FD_mirror_xmit_reply_t) + * Structure space for persistent reply data. + * Should be initialized with basic header info (magic, version, op), + * then with session info (token, xmit count), and finally with specific + * reply info (update xmit_count, status code, and message) before + * transmission. + * + * ---------------------------------------------------------------------------- + */ +struct mirror_session { + uint32_t magic; + int sockfd; + uint32_t token; + uint32_t xmit_count; + H5FD_t *file; + loginfo_t *loginfo; + H5FD_mirror_xmit_reply_t reply; +}; + + +/* --------------------------------------------------------------------------- + * Structure: struct sock_comm + * + * Structure for placing the data read and pre-processed from Driver in an + * organized fashion. Useful for pre-processing a received xmit. + * + * magic (uint32_t) + * Semi-unique number to sanity-check structure pointer and validity. + * Must equal MW_SOCK_COMM_MAGIC to be valid. + * + * recd_die (int) + * "Boolean" flag indicating that an explicit shutdown/kill/die command + * was received. Potentially useful for debugging and or "manual" + * operation of the program. + * 0 indicates normal operation, non-0 (1) indicates to die. + * + * xmit_recd (H5FD_mirror_xmit_t *) + * Structure pointer for the "xmit header" as decoded from the raw + * binary stream read from the socket. + * + * raw (char *) + * Pointer to a raw byte array, for storing data as read from the + * socket. Bytes buffer is decoded into xmit_t header and derivative + * structures. + * + * raw_size (size_t) + * Give the size of the `raw` buffer. + * + * --------------------------------------------------------------------------- + */ +struct sock_comm { + uint32_t magic; + int recd_die; + H5FD_mirror_xmit_t *xmit_recd; + char *raw; + size_t raw_size; +}; + + +/* --------------------------------------------------------------------------- + * Structure: struct mirror_writer_opts + * + * Container for default values and options as parsed from the command line. + * Currently rather vestigal, but may be expanded and/or moved to be set by + * Server and passed around as an argument. + * + * magic (uint32_t) + * Semi-unique number to sanity-check structure pointer and validity. + * Must equal MW_OPTS_MAGIC to be valid. + * + * logpath (char *) + * String pointer. Allocated at runtime. + * Specifies file location for logging output. + * May be NULL -- uses default output (e.g., stdout). + * + * ---------------------------------------------------------------------------- + */ +struct mirror_writer_opts { + uint32_t magic; + char *logpath; +}; + +static int do_open(struct mirror_session *session, + const H5FD_mirror_xmit_open_t *xmit_open); + + +/* --------------------------------------------------------------------------- + * Function: session_init + * + * Purpose: Populate mirror_session structure with default and + * options-drived values. + * + * Return: An allocated mirror_session structure pointer on success, + * else NULL. + * ---------------------------------------------------------------------------- + */ +static struct mirror_session * +session_init(struct mirror_writer_opts *opts) +{ + struct mirror_session *session = NULL; + + mirror_log(NULL, V_INFO, "session_init()"); + + if (NULL == opts || opts->magic != MW_OPTS_MAGIC) { + mirror_log(NULL, V_ERR, "invalid opts pointer"); + goto error; + } + + session = (struct mirror_session *)HDmalloc(sizeof(struct mirror_session)); + if (session == NULL) { + mirror_log(NULL, V_ERR, "can't allocate session structure"); + goto error; + } + + session->magic = MW_SESSION_MAGIC; + session->sockfd = -1; + session->xmit_count = 0; + session->token = 0; + session->file = NULL; + + session->reply.pub.magic = H5FD_MIRROR_XMIT_MAGIC; + session->reply.pub.version = H5FD_MIRROR_XMIT_CURR_VERSION; + session->reply.pub.op = H5FD_MIRROR_OP_REPLY; + session->reply.pub.session_token = 0; + HDbzero(session->reply.message, H5FD_MIRROR_STATUS_MESSAGE_MAX); + + /* Options-derived population + */ + + session->loginfo = mirror_log_init(opts->logpath, "W- ", + MIRROR_LOG_DEFAULT_VERBOSITY); + + return session; + +error: + if (session) { + HDfree(session); + } + return NULL; +} /* end session_init() */ + + +/* --------------------------------------------------------------------------- + * Function: session_stop + * + * Purpose: Stop and clean up a session. + * Only do this as part of program termination or aborting startup. + * + * Return: 0 on success, or negative sum of errors encountered. + * ---------------------------------------------------------------------------- + */ +static int +session_stop(struct mirror_session *session) +{ + int ret_value = 0; + + HDassert(session && (session->magic == MW_SESSION_MAGIC)); + + mirror_log(session->loginfo, V_INFO, "session_stop()"); + + /* Close HDF5 file if it is still open (probably in error) */ + if (session->file) { + mirror_log(session->loginfo, V_WARN, "HDF5 file still open at cleanup"); + if (H5FDclose(session->file) < 0) { + mirror_log(session->loginfo, V_ERR, "H5FDclose() during cleanup!"); + ret_value--; + } + } + + /* Socket will be closed by parent side of server fork after exit */ + + /* Close custom logging stream */ + if (mirror_log_term(session->loginfo) < 0) { + mirror_log(NULL, V_ERR, "Problem closing logging stream"); + ret_value--; + } + session->loginfo = NULL; + + /* Invalidate and release structure */ + session->magic++; + HDfree(session); + + return ret_value; +} /* end session_stop() */ + + +/* --------------------------------------------------------------------------- + * Function: session_start + * + * Purpose: Initiate session, open files. + * + * Return: Success: A valid mirror_session pointer which must later be + * cleaned up with session_stop(). + * Failure: NULL, after cleaning up after itself. + * --------------------------------------------------------------------------- + */ +static struct mirror_session * +session_start(int socketfd, const H5FD_mirror_xmit_open_t *xmit_open) +{ + struct mirror_session *session = NULL; + struct mirror_writer_opts opts; +#if 0 /* TODO: behaviro option */ + char logpath[H5FD_MIRROR_XMIT_FILEPATH_MAX] = ""; +#endif + + mirror_log(NULL, V_INFO, "session_start()"); + + if (FALSE == H5FD_mirror_xmit_is_open(xmit_open)) { + mirror_log(NULL, V_ERR, "invalid OPEN xmit"); + return NULL; + } + + opts.magic = MW_OPTS_MAGIC; +#if 0 /* TODO: behavior option */ + HDsnprintf(logpath, H5FD_MIRROR_XMIT_FILEPATH_MAX, "%s.log", + xmit_open->filename); + opts.logpath = logpath; +#else + opts.logpath = NULL; +#endif + + session = session_init(&opts); + if (NULL == session) { + mirror_log(NULL, V_ERR, "can't instantiate session"); + goto error; + } + + session->sockfd = socketfd; + + if (do_open(session, xmit_open) < 0) { + mirror_log(NULL, V_ERR, "unable to open file"); + goto error; + } + + return session; + +error: + if (session != NULL) { + if (session_stop(session) < 0) { + mirror_log(NULL, V_WARN, "Can't abort session init"); + } + session = NULL; + } + return NULL; +} + + +/* --------------------------------------------------------------------------- + * Function: _xmit_reply + * + * Purpose: Common operations to send a reply xmit through the session. + * + * Return: 0 on success, -1 if error. + * ---------------------------------------------------------------------------- + */ +static int +_xmit_reply(struct mirror_session *session) +{ + unsigned char xmit_buf[H5FD_MIRROR_XMIT_REPLY_SIZE]; + H5FD_mirror_xmit_reply_t *reply = &(session->reply); + + HDassert(session && (session->magic == MW_SESSION_MAGIC)); + + mirror_log(session->loginfo, V_ALL, "_xmit_reply()"); + + reply->pub.xmit_count = session->xmit_count++; + if (H5FD_mirror_xmit_encode_reply(xmit_buf, + (const H5FD_mirror_xmit_reply_t *)reply) + != H5FD_MIRROR_XMIT_REPLY_SIZE) + { + mirror_log(session->loginfo, V_ERR, "can't encode reply"); + return -1; + } + + mirror_log(session->loginfo, V_ALL, "reply xmit data\n```"); + mirror_log_bytes(session->loginfo, V_ALL, H5FD_MIRROR_XMIT_REPLY_SIZE, + (const unsigned char *)xmit_buf); + mirror_log(session->loginfo, V_ALL, "```"); + + if (HDwrite(session->sockfd, xmit_buf, H5FD_MIRROR_XMIT_REPLY_SIZE) < 0) { + mirror_log(session->loginfo, V_ERR, "can't write reply to Driver"); + return -1; + } + + return 0; +} /* end _xmit_reply() */ + + +/* --------------------------------------------------------------------------- + * Function: reply_ok + * + * Purpose: Send an OK reply through the session. + * + * Return: 0 on success, -1 if error. + * --------------------------------------------------------------------------- + */ +static int +reply_ok(struct mirror_session *session) +{ + H5FD_mirror_xmit_reply_t *reply = &(session->reply); + + HDassert(session && (session->magic == MW_SESSION_MAGIC)); + + mirror_log(session->loginfo, V_ALL, "reply_ok()"); + + reply->status = H5FD_MIRROR_STATUS_OK; + HDbzero(reply->message, H5FD_MIRROR_STATUS_MESSAGE_MAX); + return _xmit_reply(session); +} /* end reply_ok() */ + + +/* --------------------------------------------------------------------------- + * Function: reply_error + * + * Purpose: Send an ERROR reply with message through the session. + * Message may be cut short if it would overflow the available + * buffer in the xmit. + * + * Return: 0 on success, -1 if error. + * --------------------------------------------------------------------------- + */ +static int +reply_error(struct mirror_session *session, + const char *msg) +{ + H5FD_mirror_xmit_reply_t *reply = &(session->reply); + + HDassert(session && (session->magic == MW_SESSION_MAGIC)); + + mirror_log(session->loginfo, V_ALL, "reply_error(%s)", msg); + + reply->status = H5FD_MIRROR_STATUS_ERROR; + HDsnprintf(reply->message, H5FD_MIRROR_STATUS_MESSAGE_MAX-1, "%s", msg); + return _xmit_reply(session); +} /* end reply_error() */ + + +/* --------------------------------------------------------------------------- + * Function: do_close + * + * Purpose: Handle an CLOSE operation. + * + * Return: 0 on success, -1 if error. + * --------------------------------------------------------------------------- + */ +static int +do_close(struct mirror_session *session) +{ + + HDassert(session && (session->magic == MW_SESSION_MAGIC)); + + mirror_log(session->loginfo, V_INFO, "do_close()"); + + if (NULL == session->file) { + mirror_log(session->loginfo, V_ERR, "no file to close!"); + reply_error(session, "no file to close"); + return -1; + } + + if (H5FDclose(session->file) < 0) { + mirror_log(session->loginfo, V_ERR, "H5FDclose()"); + reply_error(session, "H5FDclose()"); + return -1; + } + session->file = NULL; + + if (reply_ok(session) < 0) { + mirror_log(session->loginfo, V_ERR, "can't reply"); + reply_error(session, "ok reply failed; session contaminated"); + return -1; + } + + return 0; +} /* end do_close() */ + + +/* --------------------------------------------------------------------------- + * Function: do_lock + * + * Purpose: Handle a LOCK operation. + * + * Return: 0 on success, -1 if error. + * --------------------------------------------------------------------------- + */ +static int +do_lock(struct mirror_session *session, + const unsigned char *xmit_buf) +{ + size_t decode_ret = 0; + H5FD_mirror_xmit_lock_t xmit_lock; + + HDassert(session && (session->magic == MW_SESSION_MAGIC) && xmit_buf); + + mirror_log(session->loginfo, V_INFO, "do_lock()"); + + decode_ret = H5FD_mirror_xmit_decode_lock(&xmit_lock, xmit_buf); + if (H5FD_MIRROR_XMIT_LOCK_SIZE != decode_ret) { + mirror_log(session->loginfo, V_ERR, "can't decode set-eoa xmit"); + reply_error(session, "remote xmit_eoa_t decoding size failure"); + return -1; + } + + if (!H5FD_mirror_xmit_is_lock(&xmit_lock)) { + mirror_log(session->loginfo, V_ERR, "not a set-eoa xmit"); + reply_error(session, "remote xmit_eoa_t decode failure"); + return -1; + } + mirror_log(session->loginfo, V_INFO, "lock rw: (%d)", xmit_lock.rw); + + if (H5FDlock(session->file, (hbool_t)xmit_lock.rw) < 0) { + mirror_log(session->loginfo, V_ERR, "H5FDlock()"); + reply_error(session, "remote H5FDlock() failure"); + return -1; + } + + if (reply_ok(session) < 0) { + mirror_log(session->loginfo, V_ERR, "can't reply"); + reply_error(session, "ok reply failed; session contaminated"); + return -1; + } + + return 0; +} /* end do_lock() */ + + +/* --------------------------------------------------------------------------- + * Function: do_open + * + * Purpose: Handle an OPEN operation. + * + * Return: 0 on success, -1 if error. + * --------------------------------------------------------------------------- + */ +static int +do_open(struct mirror_session *session, + const H5FD_mirror_xmit_open_t *xmit_open) +{ + hid_t fapl_id = H5I_INVALID_HID; + unsigned _flags = 0; + haddr_t _maxaddr = HADDR_UNDEF; + + HDassert(session && (session->magic == MW_SESSION_MAGIC) && xmit_open && + TRUE == H5FD_mirror_xmit_is_open(xmit_open)); + + mirror_log(session->loginfo, V_INFO, "do_open()"); + + if (0 != xmit_open->pub.xmit_count) { + mirror_log(session->loginfo, V_ERR, "open with xmit count not zero!"); + reply_error(session, "initial transmission count not zero"); + goto error; + } + if (0 != session->token) { + mirror_log(session->loginfo, V_ERR, "open with token already set!"); + reply_error(session, "initial session token not zero"); + goto error; + } + + session->xmit_count = 1; + session->token = xmit_open->pub.session_token; + session->reply.pub.session_token = session->token; + + _flags = (unsigned)xmit_open->flags; + _maxaddr = (haddr_t)xmit_open->maxaddr; + + /* Check whether the native size_t on the remote machine (Driver) is larger + * than that on the local machine; if so, issue a warning. + * The blob is always an 8-byte bitfield -- check its contents. + */ + if (xmit_open->size_t_blob > (uint64_t)((size_t)(-1))) { + mirror_log(session->loginfo, V_WARN, + "Driver size_t is larger than our own"); + } + + mirror_log(session->loginfo, V_INFO, + "to open file %s (flags %d) (maxaddr %d)", + xmit_open->filename, _flags, _maxaddr); + + /* Explicitly use Sec2 as the underlying driver for now. + */ + fapl_id = H5Pcreate(H5P_FILE_ACCESS); + if (fapl_id < 0) { + mirror_log(session->loginfo, V_ERR, "can't create FAPL"); + reply_error(session, "H5Pcreate() failure"); + goto error; + } + if (H5Pset_fapl_sec2(fapl_id) < 0) { + mirror_log(session->loginfo, V_ERR, "can't set FAPL as Sec2"); + reply_error(session, "H5Pset_fapl_sec2() failure"); + goto error; + } + + session->file = H5FDopen(xmit_open->filename, _flags, fapl_id, _maxaddr); + if (NULL == session->file) { + mirror_log(session->loginfo, V_ERR, "H5FDopen()"); + reply_error(session, "remote H5FDopen() failure"); + goto error; + } + + /* FAPL is set and in use; clean up */ + if (H5Pclose(fapl_id) < 0) { + mirror_log(session->loginfo, V_ERR, "can't set FAPL as Sec2"); + reply_error(session, "H5Pset_fapl_sec2() failure"); + goto error; + } + + if (reply_ok(session) < 0) { + mirror_log(session->loginfo, V_ERR, "can't reply"); + reply_error(session, "ok reply failed; session contaminated"); + return -1; + } + + return 0; + +error: + if (fapl_id > 0) { + H5E_BEGIN_TRY { + (void)H5Pclose(fapl_id); + } H5E_END_TRY; + } + return -1; +} /* end do_open() */ + + +/* --------------------------------------------------------------------------- + * Function: do_set_eoa + * + * Purpose: Handle a SET_EOA operation. + * + * Return: 0 on success, -1 if error. + * --------------------------------------------------------------------------- + */ +static int +do_set_eoa(struct mirror_session *session, + const unsigned char *xmit_buf) +{ + size_t decode_ret = 0; + H5FD_mirror_xmit_eoa_t xmit_seoa; + + HDassert(session && (session->magic == MW_SESSION_MAGIC) && xmit_buf); + + mirror_log(session->loginfo, V_INFO, "do_set_eoa()"); + + decode_ret = H5FD_mirror_xmit_decode_set_eoa(&xmit_seoa, xmit_buf); + if (H5FD_MIRROR_XMIT_EOA_SIZE != decode_ret) { + mirror_log(session->loginfo, V_ERR, "can't decode set-eoa xmit"); + reply_error(session, "remote xmit_eoa_t decoding size failure"); + return -1; + } + + if (!H5FD_mirror_xmit_is_set_eoa(&xmit_seoa)) { + mirror_log(session->loginfo, V_ERR, "not a set-eoa xmit"); + reply_error(session, "remote xmit_eoa_t decode failure"); + return -1; + } + + mirror_log(session->loginfo, V_INFO, "set EOA addr %d", + xmit_seoa.eoa_addr); + + if (H5FDset_eoa(session->file, (H5FD_mem_t)xmit_seoa.type, + (haddr_t)xmit_seoa.eoa_addr) + < 0) + { + mirror_log(session->loginfo, V_ERR, "H5FDset_eoa()"); + reply_error(session, "remote H5FDset_eoa() failure"); + return -1; + } + + if (reply_ok(session) < 0) { + mirror_log(session->loginfo, V_ERR, "can't reply"); + reply_error(session, "ok reply failed; session contaminated"); + return -1; + } + + return 0; +} /* end do_set_eoa() */ + + +/* --------------------------------------------------------------------------- + * Function: do_truncate + * + * Purpose: Handle a TRUNCATE operation. + * + * Return: 0 on success, -1 if error. + * --------------------------------------------------------------------------- + */ +static int +do_truncate(struct mirror_session *session) +{ + + HDassert(session && (session->magic == MW_SESSION_MAGIC)); + + mirror_log(session->loginfo, V_INFO, "do_truncate()"); + + /* default DXPL ID (0), 0 for "FALSE" closing -- both probably unused */ + if (H5FDtruncate(session->file, 0, 0) < 0) { + mirror_log(session->loginfo, V_ERR, "H5FDtruncate()"); + reply_error(session, "remote H5FDtruncate() failure"); + return -1; + } + + if (reply_ok(session) < 0) { + mirror_log(session->loginfo, V_ERR, "can't reply"); + reply_error(session, "ok reply failed; session contaminated"); + return -1; + } + + return 0; +} /* end do_truncate() */ + + +/* --------------------------------------------------------------------------- + * Function: do_unlock + * + * Purpose: Handle an UNLOCK operation. + * + * Return: 0 on success, -1 if error. + * --------------------------------------------------------------------------- + */ +static int +do_unlock(struct mirror_session *session) +{ + HDassert(session && (session->magic == MW_SESSION_MAGIC)); + + mirror_log(session->loginfo, V_INFO, "do_unlock()"); + + if (H5FDunlock(session->file) < 0) { + mirror_log(session->loginfo, V_ERR, "H5FDunlock()"); + reply_error(session, "remote H5FDunlock() failure"); + return -1; + } + + if (reply_ok(session) < 0) { + mirror_log(session->loginfo, V_ERR, "can't reply"); + reply_error(session, "ok reply failed; session contaminated"); + return -1; + } + + return 0; +} /* end do_unlock() */ + + +/* --------------------------------------------------------------------------- + * Function: do_write + * + * Purpose: Handle a WRITE operation. + * Receives command, replies; receives & writes data, replies. + * + * It is known that this results in suboptimal performance, + * but handling both small and very, very large write buffers + * with a single "over the wire" exchange + * poses design challenges not worth tackling as of March 2020. + * + * Return: 0 on success, -1 if error. + * --------------------------------------------------------------------------- + */ +static int +do_write(struct mirror_session *session, + const unsigned char *xmit_buf) +{ + size_t decode_ret = 0; + haddr_t addr = 0; + haddr_t sum_bytes_written = 0; + H5FD_mem_t type = 0; + char *buf = NULL; + ssize_t nbytes_in_packet = 0; + H5FD_mirror_xmit_write_t xmit_write; + + HDassert(session && (session->magic == MW_SESSION_MAGIC) && xmit_buf); + + mirror_log(session->loginfo, V_INFO, "do_write()"); + + if (NULL == session->file) { + mirror_log(session->loginfo, V_ERR, "no open file!"); + reply_error(session, "no file open on remote"); + return -1; + } + + decode_ret = H5FD_mirror_xmit_decode_write(&xmit_write, xmit_buf); + if (H5FD_MIRROR_XMIT_WRITE_SIZE != decode_ret) { + mirror_log(session->loginfo, V_ERR, "can't decode write xmit"); + reply_error(session, "remote xmit_write_t decoding size failure"); + return -1; + } + + if (!H5FD_mirror_xmit_is_write(&xmit_write)) { + mirror_log(session->loginfo, V_ERR, "not a write xmit"); + reply_error(session, "remote xmit_write_t decode failure"); + return -1; + } + + addr = (haddr_t)xmit_write.offset; + type = (H5FD_mem_t)xmit_write.type; + + /* Allocate the buffer once -- re-use between loops. + */ + buf = (char *)HDmalloc(sizeof(char) * H5FD_MIRROR_DATA_BUFFER_MAX); + if (NULL == buf) { + mirror_log(session->loginfo, V_ERR, "can't allocate databuffer"); + reply_error(session, "can't allocate buffer for receiving data"); + return -1; + } + + /* got write signal; ready for data */ + if (reply_ok(session) < 0) { + mirror_log(session->loginfo, V_ERR, "can't reply"); + reply_error(session, "ok reply failed; session contaminated"); + return -1; + } + + mirror_log(session->loginfo, V_INFO, "to write %zu bytes at %zu", + xmit_write.size, + addr); + + /* The given write may be: + * 1. larger than the allowed single buffer size + * 2. larger than the native size_t of this system + * + * Handle all cases by looping, ingesting as much of the stream as possible + * and writing that part to the file. + */ + sum_bytes_written = 0; + do { + nbytes_in_packet = HDread(session->sockfd, buf, + H5FD_MIRROR_DATA_BUFFER_MAX); + if (-1 == nbytes_in_packet) { + mirror_log(session->loginfo, V_ERR, "can't read into databuffer"); + reply_error(session, "can't read data buffer"); + return -1; + } + + mirror_log(session->loginfo, V_INFO, "received %zd bytes", + nbytes_in_packet); + if (HEXDUMP_WRITEDATA) { + mirror_log(session->loginfo, V_ALL, "DATA:\n```"); + mirror_log_bytes(session->loginfo, V_ALL, nbytes_in_packet, + (const unsigned char *)buf); + mirror_log(session->loginfo, V_ALL, "```"); + } + + mirror_log(session->loginfo, V_INFO, "writing %zd bytes at %zu", + nbytes_in_packet, + (addr + sum_bytes_written)); + + if (H5FDwrite(session->file, type, H5P_DEFAULT, + (addr + sum_bytes_written), (size_t)nbytes_in_packet, buf) + < 0) + { + mirror_log(session->loginfo, V_ERR, "H5FDwrite()"); + reply_error(session, "remote H5FDwrite() failure"); + return -1; + } + + sum_bytes_written += (haddr_t)nbytes_in_packet; + + } while (sum_bytes_written < xmit_write.size); /* end while ingesting */ + + HDfree(buf); + + /* signal that we're done here and a-ok */ + if (reply_ok(session) < 0) { + mirror_log(session->loginfo, V_ERR, "can't reply"); + reply_error(session, "ok reply failed; session contaminated"); + return -1; + } + + return 0; +} /* end do_write() */ + + +/* --------------------------------------------------------------------------- + * Function: receive_communique + * + * Purpose: Accept bytes from the socket, check for emergency shutdown, and + * sanity-check received bytes. + * The raw bytes read are stored in the sock_comm structure at + * comm->raw. + * The raw bytes are decoded and a xmit_t (header) struct pointer + * in comm is populated at comm->xmit_recd. + * + * Return: 0 on success, -1 if error. + * --------------------------------------------------------------------------- + */ +static int +receive_communique( + struct mirror_session *session, + struct sock_comm *comm) +{ + ssize_t read_ret = 0; + size_t decode_ret; + H5FD_mirror_xmit_t *X = comm->xmit_recd; + + HDassert((session != NULL) && \ + (session->magic == MW_SESSION_MAGIC) && \ + (comm != NULL) && \ + (comm->magic == MW_SOCK_COMM_MAGIC) && \ + (comm->xmit_recd != NULL) && \ + (comm->raw != NULL) && \ + (comm->raw_size >= H5FD_MIRROR_XMIT_BUFFER_MAX) ); + + mirror_log(session->loginfo, V_INFO, "receive_communique()"); + + HDbzero(comm->raw, comm->raw_size); + comm->recd_die = 0; + + mirror_log(session->loginfo, V_INFO, "ready to receive"); /* TODO */ + + read_ret = HDread(session->sockfd, comm->raw, H5FD_MIRROR_XMIT_BUFFER_MAX); + if (-1 == read_ret) { + mirror_log(session->loginfo, V_ERR, "read:%zd", read_ret); + goto error; + } + + mirror_log(session->loginfo, V_INFO, "received %zd bytes", read_ret); + if (HEXDUMP_XMITS) { + mirror_log(session->loginfo, V_ALL, "```", read_ret); + mirror_log_bytes(session->loginfo, V_ALL, (size_t)read_ret, + (const unsigned char *)comm->raw); + mirror_log(session->loginfo, V_ALL, "```"); + } /* end if hexdump transmissions received */ + + /* old-fashioned manual kill (for debugging) */ + if (!HDstrncmp("GOODBYE", comm->raw, 7)) { + mirror_log(session->loginfo, V_INFO, "received GOODBYE"); + comm->recd_die = 1; + goto done; + } + + decode_ret = H5FD_mirror_xmit_decode_header(X, + (const unsigned char *)comm->raw); + if (H5FD_MIRROR_XMIT_HEADER_SIZE != decode_ret) { + mirror_log(session->loginfo, V_ERR, + "header decode size mismatch: expected (%z), got (%z)", + H5FD_MIRROR_XMIT_HEADER_SIZE, decode_ret); + /* Try to tell Driver that it should stop */ + reply_error(session, "xmit size mismatch"); + goto error; + } + + if (!H5FD_mirror_xmit_is_xmit(X)) { + mirror_log(session->loginfo, V_ERR, "bad magic: 0x%X", X->magic); + /* Try to tell Driver that it should stop */ + reply_error(session, "bad magic"); + goto error; + } + + if (session->xmit_count != X->xmit_count) { + mirror_log(session->loginfo, V_ERR, + "xmit_count mismatch exp:%d recd:%d", + session->xmit_count, X->xmit_count); + /* Try to tell Driver that it should stop */ + reply_error(session, "xmit_count mismatch"); + goto error; + } + + if ( (session->token > 0) && (session->token != X->session_token) ) { + mirror_log(session->loginfo, V_ERR, "wrong session"); + /* Try to tell Driver that it should stop */ + reply_error(session, "wrong session"); + goto error; + } + + session->xmit_count++; + +done: + return 0; + +error: + return -1; +} /* end receive_communique() */ + + +/* --------------------------------------------------------------------------- + * Function: process_instructions + * + * Purpose: Receive and handle all instructions from Driver. + * + * Return: 0 on success, -1 if error. + * --------------------------------------------------------------------------- + */ +static int +process_instructions(struct mirror_session *session) +{ + struct sock_comm comm; + char xmit_buf[H5FD_MIRROR_XMIT_BUFFER_MAX]; /* raw bytes */ + H5FD_mirror_xmit_t xmit_recd; /* for decoded xmit header */ + + HDassert(session && (session->magic == MW_SESSION_MAGIC)); + + mirror_log(session->loginfo, V_INFO, "process_instructions()"); + + comm.magic = MW_SOCK_COMM_MAGIC; + comm.recd_die = 0; /* Flag for program to terminate */ + comm.xmit_recd = &xmit_recd; + comm.raw = xmit_buf; + comm.raw_size = sizeof(xmit_buf); + + while (1) { /* sill-listening infinite loop */ + + /* Use convenience structure for raw/decoded info in/out */ + if (receive_communique(session, &comm) < 0) { + mirror_log(session->loginfo, V_ERR, "problem reading socket"); + return -1; + } + + if (comm.recd_die) { + goto done; + } + + switch(xmit_recd.op) { + case H5FD_MIRROR_OP_CLOSE: + if (do_close(session) < 0) { + return -1; + } + goto done; + case H5FD_MIRROR_OP_LOCK: + if (do_lock(session, (const unsigned char *)xmit_buf) < 0) { + return -1; + } + break; + case H5FD_MIRROR_OP_OPEN: + mirror_log(session->loginfo, V_ERR, "OPEN xmit during session"); + reply_error(session, "illegal OPEN xmit during session"); + return -1; + case H5FD_MIRROR_OP_SET_EOA: + if (do_set_eoa(session, (const unsigned char *)xmit_buf) < 0) { + return -1; + } + break; + case H5FD_MIRROR_OP_TRUNCATE: + if (do_truncate(session) < 0) { + return -1; + } + break; + case H5FD_MIRROR_OP_UNLOCK: + if (do_unlock(session) < 0) { + return -1; + } + break; + case H5FD_MIRROR_OP_WRITE: + if (do_write(session, (const unsigned char *)xmit_buf) < 0) { + return -1; + } + break; + default: + mirror_log(session->loginfo, V_ERR, "unrecognized transmission"); + reply_error(session, "unrecognized transmission"); + return -1; + } /* end switch (xmit_recd.op) */ + + } /* end while still listening */ + +done: + comm.magic = 0; /* invalidate structure, on principle */ + xmit_recd.magic = 0; /* invalidate structure, on principle */ + return 0; +} /* end process_instructions() */ + + +/* ------------------------------------------------------------------------- */ +herr_t +run_writer(int socketfd, H5FD_mirror_xmit_open_t *xmit_open) +{ + struct mirror_session *session = NULL; + int ret_value = SUCCEED; + + session = session_start(socketfd, xmit_open); + if (NULL == session) { + mirror_log(NULL, V_ERR, "Can't start session -- aborting"); + ret_value = FAIL; + } + else { + if (process_instructions(session) < 0) { + mirror_log(session->loginfo, V_ERR, + "problem processing instructions"); + ret_value = FAIL; + } + if (session_stop(session) < 0) { + mirror_log(NULL, V_ERR, "Can't stop session -- going down hard"); + ret_value = FAIL; + } + } + + return ret_value; +} /* end run_writer */ + +#endif /* H5_HAVE_MIRROR_VFD */ + |