/* * Copyright © 2000 The author. * The author prefers this code not be used for military purposes. * * * Author: Thomas Radke * Tuesday, September 12, 2000 * * Purpose: This code provides the Stream Virtual File Driver. * It is very much based on the core VFD which keeps an * entire HDF5 data file to be processed in main memory. * In addition to that, the memory image of the file is * read from/written to a socket during an open/flush operation. * * Version: $Id$ * * Modifications: * Thomas Radke, Thursday, October 26, 2000 * Added support for Windows. * Catch SIGPIPE on an open socket. * */ #include "H5private.h" /* library function */ /* Only build this driver if it was configured with --with-Stream-VFD */ #ifdef H5_HAVE_STREAM #include "H5Eprivate.h" /* error handling */ #include "H5FDpublic.h" /* Public VFD header */ #include "H5FDstream.h" /* Stream VFD header */ #include "H5Iprivate.h" /* IDs */ #include "H5MMprivate.h" /* memory allocation */ #include "H5Pprivate.h" /* property prototypes */ #ifdef H5FD_STREAM_HAVE_UNIX_SOCKETS #ifdef H5_HAVE_SYS_TYPES_H #include /* socket stuff */ #endif #ifdef H5_HAVE_SYS_SOCKET_H #include /* socket stuff */ #endif #include /* gethostbyname */ #include /* socket stuff */ #ifdef H5_HAVE_NETINET_TCP_H #include /* socket stuff */ #endif #ifdef H5_HAVE_SYS_FILIO_H #include /* socket stuff */ #endif #endif #ifndef H5_HAVE_SOCKLEN_T typedef int socklen_t; #endif /* Some useful macros */ #ifdef MIN #undef MIN #endif #ifdef MAX #undef MAX #endif #define MIN(x,y) ((x) < (y) ? (x) : (y)) #define MAX(x,y) ((x) > (y) ? (x) : (y)) /* Uncomment this to switch on debugging output */ /* #define DEBUG 1 */ /* Define some socket stuff which is different for UNIX and Windows */ #ifdef H5FD_STREAM_HAVE_UNIX_SOCKETS #define H5FD_STREAM_CLOSE_SOCKET(a) close(a) #define H5FD_STREAM_IOCTL_SOCKET(a, b, c) ioctl(a, b, c) #define H5FD_STREAM_ERROR_CHECK(rc) ((rc) < 0) #else #define H5FD_STREAM_CLOSE_SOCKET(a) closesocket (a) #define H5FD_STREAM_IOCTL_SOCKET(a, b, c) ioctlsocket (a, b, (u_long *) (c)) #define H5FD_STREAM_ERROR_CHECK(rc) ((rc) == (SOCKET) (SOCKET_ERROR)) #endif /* The driver identification number, initialized at runtime */ static hid_t H5FD_STREAM_g = 0; /* * The description of a file belonging to this driver. The `eoa' and `eof' * determine the amount of hdf5 address space in use and the high-water mark * of the file (the current size of the underlying memory). */ typedef struct H5FD_stream_t { H5FD_t pub; /* public stuff, must be first */ H5FD_stream_fapl_t fapl; /* file access property list */ unsigned char *mem; /* the underlying memory */ haddr_t eoa; /* end of allocated region */ haddr_t eof; /* current allocated size */ H5FD_STREAM_SOCKET_TYPE socket; /* socket to write / read from */ hbool_t dirty; /* flag indicating unflushed data */ hbool_t internal_socket; /* flag indicating an internal socket */ } H5FD_stream_t; /* Allocate memory in multiples of this size (in bytes) by default */ #define H5FD_STREAM_INCREMENT 8192 /* default backlog argument for listen call */ #define H5FD_STREAM_BACKLOG 1 /* number of successive ports to hunt for until bind(2) succeeds (default 0 means no port hunting - only try the one given in the filename) */ #define H5FD_STREAM_MAXHUNT 0 /* default file access property list */ static const H5FD_stream_fapl_t default_fapl = { H5FD_STREAM_INCREMENT, /* address space allocation blocksize */ H5FD_STREAM_INVALID_SOCKET, /* no external socket descriptor */ TRUE, /* enable I/O on socket */ H5FD_STREAM_BACKLOG, /* default backlog for listen(2) */ NULL, /* do not broadcast received files */ NULL, /* argument to READ broadcast routine */ H5FD_STREAM_MAXHUNT, /* default number of ports to hunt */ 0 /* unknown port for unbound socket */ }; /* * These macros check for overflow of various quantities. These macros * assume that file_offset_t is signed and haddr_t and size_t are unsigned. * * ADDR_OVERFLOW: Checks whether a file address of type `haddr_t' * is too large to be represented by the second argument * of the file seek function. * * SIZE_OVERFLOW: Checks whether a buffer size of type `hsize_t' is too * large to be represented by the `size_t' type. * * REGION_OVERFLOW: Checks whether an address and size pair describe data * which can be addressed entirely in memory. */ #ifdef H5_HAVE_LSEEK64 # define file_offset_t off64_t #else # define file_offset_t off_t #endif #define MAXADDR (((haddr_t)1<<(8*sizeof(file_offset_t)-1))-1) #define ADDR_OVERFLOW(A) (HADDR_UNDEF==(A) || \ ((A) & ~(haddr_t)MAXADDR)) #define SIZE_OVERFLOW(Z) ((Z) & ~(hsize_t)MAXADDR) #define REGION_OVERFLOW(A,Z) (ADDR_OVERFLOW(A) || SIZE_OVERFLOW(Z) || \ HADDR_UNDEF==(A)+(Z) || \ (size_t)((A)+(Z))<(size_t)(A)) /* Function prototypes */ static void *H5FD_stream_fapl_get (H5FD_t *_stream); static H5FD_t *H5FD_stream_open (const char *name, unsigned flags, hid_t fapl_id, haddr_t maxaddr); static herr_t H5FD_stream_flush (H5FD_t *_stream, hbool_t closing); static herr_t H5FD_stream_close (H5FD_t *_stream); static herr_t H5FD_stream_query(const H5FD_t *_f1, unsigned long *flags); static haddr_t H5FD_stream_get_eoa (H5FD_t *_stream); static herr_t H5FD_stream_set_eoa (H5FD_t *_stream, haddr_t addr); static haddr_t H5FD_stream_get_eof (H5FD_t *_stream); static herr_t H5FD_stream_read (H5FD_t *_stream, H5FD_mem_t type, hid_t fapl_id, haddr_t addr, size_t size, void *buf); static herr_t H5FD_stream_write (H5FD_t *_stream, H5FD_mem_t type, hid_t fapl_id, haddr_t addr, size_t size, const void *buf); /* The Stream VFD's class information structure */ static const H5FD_class_t H5FD_stream_g = { "stream", /* name */ MAXADDR, /* maxaddr */ H5F_CLOSE_WEAK, /* fc_degree */ NULL, /* sb_size */ NULL, /* sb_encode */ NULL, /* sb_decode */ sizeof (H5FD_stream_fapl_t), /* fapl_size */ H5FD_stream_fapl_get, /* fapl_get */ NULL, /* fapl_copy */ NULL, /* fapl_free */ 0, /* dxpl_size */ NULL, /* dxpl_copy */ NULL, /* dxpl_free */ H5FD_stream_open, /* open */ H5FD_stream_close, /* close */ NULL, /* cmp */ H5FD_stream_query, /* query */ NULL, /* alloc */ NULL, /* free */ H5FD_stream_get_eoa, /* get_eoa */ H5FD_stream_set_eoa, /* set_eoa */ H5FD_stream_get_eof, /* get_eof */ H5FD_stream_read, /* read */ H5FD_stream_write, /* write */ H5FD_stream_flush, /* flush */ H5FD_FLMAP_SINGLE, /* fl_map */ }; /* Interface initialization */ #define INTERFACE_INIT H5FD_stream_init static int interface_initialize_g = 0; /*------------------------------------------------------------------------- * Function: H5FD_stream_init * * Purpose: Initialize this driver by registering it with the library. * * Return: Success: The driver ID for the Stream driver. * Failure: Negative. * * Programmer: Thomas Radke * Tuesday, September 12, 2000 * * Modifications: * *------------------------------------------------------------------------- */ hid_t H5FD_stream_init (void) { FUNC_ENTER (H5FD_stream_init, FAIL); if (H5I_VFL != H5Iget_type (H5FD_STREAM_g)) { H5FD_STREAM_g = H5FDregister (&H5FD_stream_g); /* set the process signal mask to ignore SIGPIPE signals */ /* NOTE: Windows doesn't know SIGPIPE signals that's why the #ifdef */ #ifdef SIGPIPE if (signal (SIGPIPE, SIG_IGN) == SIG_ERR) { fprintf (stderr, "Stream VFD warning: failed to set the process signal " "mask to ignore SIGPIPE signals\n"); } #endif } FUNC_LEAVE (H5FD_STREAM_g); } /*------------------------------------------------------------------------- * Function: H5Pset_fapl_stream * * Purpose: Modify the file access property list to use the Stream * driver defined in this source file. The INCREMENT specifies * how much to grow the memory each time we need more. * If a valid socket argument is given this will be used * by the driver instead of parsing the 'hostname:port' filename * and opening a socket internally. * * Return: Success: Non-negative * Failure: Negative * * Programmer: Thomas Radke * Tuesday, September 12, 2000 * * Modifications: * *------------------------------------------------------------------------- */ herr_t H5Pset_fapl_stream (hid_t fapl_id, H5FD_stream_fapl_t *fapl) { H5FD_stream_fapl_t user_fapl; H5P_genplist_t *plist; /* Property list pointer */ herr_t result; FUNC_ENTER (H5Pset_fapl_stream, FAIL); H5TRACE2 ("e", "ix", fapl_id, fapl); if(TRUE!=H5P_isa_class(fapl_id,H5P_FILE_ACCESS) || NULL == (plist = H5I_object(fapl_id))) HRETURN_ERROR (H5E_PLIST, H5E_BADTYPE, FAIL, "not a fapl"); if (fapl) { if (! fapl->do_socket_io && fapl->broadcast_fn == NULL) { HRETURN_ERROR (H5E_ARGS, H5E_BADVALUE, FAIL, "read broadcast function pointer is NULL"); } user_fapl = *fapl; if (fapl->increment == 0) { user_fapl.increment = H5FD_STREAM_INCREMENT; } user_fapl.port = 0; result = H5P_set_driver (plist, H5FD_STREAM, &user_fapl); } else { result = H5P_set_driver (plist, H5FD_STREAM, &default_fapl); } FUNC_LEAVE (result); } /*------------------------------------------------------------------------- * Function: H5Pget_fapl_stream * * Purpose: Queries properties set by the H5Pset_fapl_stream() function. * * Return: Success: Non-negative * Failure: Negative * * Programmer: Thomas Radke * Tuesday, September 12, 2000 * * Modifications: * *------------------------------------------------------------------------- */ herr_t H5Pget_fapl_stream(hid_t fapl_id, H5FD_stream_fapl_t *fapl /* out */) { H5FD_stream_fapl_t *this_fapl; H5P_genplist_t *plist; /* Property list pointer */ FUNC_ENTER (H5Pget_fapl_stream, FAIL); H5TRACE2("e","ix",fapl_id,fapl); if(TRUE!=H5P_isa_class(fapl_id,H5P_FILE_ACCESS) || NULL == (plist = H5I_object(fapl_id))) { HRETURN_ERROR (H5E_PLIST, H5E_BADTYPE, FAIL, "not a fapl"); } if (H5FD_STREAM != H5P_get_driver (plist)) { HRETURN_ERROR (H5E_PLIST, H5E_BADVALUE, FAIL, "incorrect VFL driver"); } if (NULL == (this_fapl = H5P_get_driver_info (plist))) { HRETURN_ERROR (H5E_PLIST, H5E_BADVALUE, FAIL, "bad VFL driver info"); } if (fapl) { *fapl = *this_fapl; } FUNC_LEAVE (SUCCEED); } /*------------------------------------------------------------------------- * Function: H5FD_stream_fapl_get * * Purpose: Returns a copy of the file access properties * * Return: Success: Ptr to new file access properties * Failure: NULL * * Programmer: Thomas Radke * Tuesday, September 12, 2000 * * Modifications: * *------------------------------------------------------------------------- */ static void *H5FD_stream_fapl_get (H5FD_t *_stream) { H5FD_stream_t *stream = (H5FD_stream_t *) _stream; H5FD_stream_fapl_t *fapl; FUNC_ENTER (H5FD_stream_fapl_get, NULL); if ((fapl = H5MM_calloc (sizeof (H5FD_stream_fapl_t))) == NULL) { HRETURN_ERROR (H5E_RESOURCE, H5E_NOSPACE, NULL, "memory allocation failed"); } *fapl = stream->fapl; FUNC_LEAVE (fapl); } static H5FD_STREAM_SOCKET_TYPE H5FDstream_open_socket (const char *filename, int o_flags, H5FD_stream_fapl_t *fapl, const char **errormsg, H5E_major_t *major, H5E_minor_t *minor) { struct sockaddr_in server; struct hostent *he; H5FD_STREAM_SOCKET_TYPE sock; char *hostname; unsigned short int first_port; const char *separator, *tmp; int on = 1; sock = H5FD_STREAM_INVALID_SOCKET; *errormsg = NULL; /* Parse "hostname:port" from filename argument */ for (separator = filename; *separator != ':' && *separator; separator++); if (separator == filename || !*separator) { *errormsg = "invalid host address"; } else { tmp = separator; if (! tmp[1]) { *errormsg = "no port number"; } while (*errormsg == NULL && *++tmp) { if (! isdigit (*tmp)) { *errormsg = "invalid port number"; } } } /* Return if parsing the filename failed */ if (*errormsg) { *major = H5E_ARGS; *minor = H5E_BADVALUE; return (sock); } hostname = (char *) H5MM_malloc ((size_t)(separator - filename + 1)); /* Return if out of memory */ if (hostname == NULL) { *major = H5E_RESOURCE; *minor = H5E_NOSPACE; *errormsg = "memory allocation failed"; return (sock); } HDstrncpy (hostname, filename, (size_t)(separator - filename)); hostname[separator - filename] = 0; fapl->port = atoi (separator + 1); HDmemset (&server, 0, sizeof (server)); server.sin_family = AF_INET; server.sin_port = htons (fapl->port); if (! (he = gethostbyname (hostname))) { *errormsg = "unable to get host address"; } else if (H5FD_STREAM_ERROR_CHECK (sock = socket (AF_INET, SOCK_STREAM, 0))) { *errormsg = "unable to open socket"; } if (*errormsg == NULL) { if (O_RDONLY == o_flags) { HDmemcpy (&server.sin_addr, he->h_addr, (size_t)he->h_length); #ifdef DEBUG fprintf (stderr, "Stream VFD: connecting to host '%s' port %d\n", hostname, fapl->port); #endif if (connect (sock, (struct sockaddr *) &server, sizeof (server)) < 0) { *errormsg = "unable to connect"; } } else { server.sin_addr.s_addr = INADDR_ANY; if (H5FD_STREAM_IOCTL_SOCKET (sock, FIONBIO, &on) < 0) { *errormsg = "unable to set non-blocking mode for socket"; } else if (setsockopt (sock, IPPROTO_TCP, TCP_NODELAY, (const char *) &on, sizeof(on)) < 0) { *errormsg = "unable to set socket option TCP_NODELAY"; } else if (setsockopt (sock, SOL_SOCKET, SO_REUSEADDR, (const char *) &on, sizeof(on)) < 0) { *errormsg = "unable to set socket option SO_REUSEADDR"; } else { /* Try to bind the socket to the given port. If maxhunt is given try some successive ports also. */ first_port = fapl->port; while (fapl->port <= first_port + fapl->maxhunt) { #ifdef DEBUG fprintf (stderr, "Stream VFD: binding to port %d\n", fapl->port); #endif server.sin_port = htons (fapl->port); if (bind (sock, (struct sockaddr *) &server, sizeof (server)) < 0) { fapl->port++; } else { break; } } if (fapl->port > first_port + fapl->maxhunt) { fapl->port = 0; *errormsg = "unable to bind socket"; } else if (listen (sock, fapl->backlog) < 0) { *errormsg = "unable to listen on socket"; } } } } H5MM_xfree (hostname); /* Return if opening the socket failed */ if (*errormsg) { if (! H5FD_STREAM_ERROR_CHECK (sock)) { H5FD_STREAM_CLOSE_SOCKET (sock); sock = H5FD_STREAM_INVALID_SOCKET; } *major = H5E_FILE; *minor = H5E_CANTOPENFILE; } return (sock); } static void H5FDstream_read_from_socket (H5FD_stream_t *stream, const char **errormsg, H5E_major_t *major, H5E_minor_t *minor) { int size; size_t max_size = 0; unsigned char *ptr; ptr = NULL; *errormsg = NULL; stream->eof = 0; stream->mem = NULL; while (1) { if (max_size <= 0) { /* * Allocate initial buffer as increment + 1 * to prevent unnecessary reallocation * if increment is exactly a multiple of the filesize */ max_size = stream->fapl.increment; if (! stream->mem) { max_size++; } ptr = H5MM_realloc (stream->mem, (size_t) (stream->eof + max_size)); if (! ptr) { *major = H5E_RESOURCE; *minor = H5E_NOSPACE; *errormsg = "unable to allocate file space buffer"; return; } stream->mem = ptr; ptr += stream->eof; } /* now receive the next chunk of data */ size = recv (stream->socket, ptr, max_size, 0); if (size < 0 && (EINTR == errno || EAGAIN == errno || EWOULDBLOCK)) { continue; } if (size < 0) { *major = H5E_IO; *minor = H5E_READERROR; *errormsg = "error reading from file from socket"; return; } if (! size) { break; } max_size -= (size_t) size; stream->eof += (haddr_t) size; ptr += size; #ifdef DEBUG fprintf (stderr, "Stream VFD: read %d bytes (%d total) from socket\n", size, (int) stream->eof); #endif } #ifdef DEBUG fprintf (stderr, "Stream VFD: read total of %d bytes from socket\n", (int) stream->eof); #endif } /*------------------------------------------------------------------------- * Function: H5FD_stream_open * * Purpose: Opens an HDF5 file in memory. * * Return: Success: A pointer to a new file data structure. The * public fields will be initialized by the * caller, which is always H5FD_open(). * Failure: NULL * * Programmer: Thomas Radke * Tuesday, September 12, 2000 * * Modifications: * *------------------------------------------------------------------------- */ static H5FD_t *H5FD_stream_open (const char *filename, unsigned flags, hid_t fapl_id, haddr_t maxaddr) { H5FD_stream_t _stream, *stream; const H5FD_stream_fapl_t *fapl; int o_flags; H5E_major_t major; H5E_minor_t minor; const char *errormsg; #ifdef WIN32 WSADATA wsadata; #endif H5P_genplist_t *plist; /* Property list pointer */ FUNC_ENTER (H5FD_stream_open, NULL); /* Check arguments */ if (filename == NULL|| *filename == '\0') { HRETURN_ERROR (H5E_ARGS, H5E_BADVALUE, NULL,"invalid file name"); } if (maxaddr == 0 || HADDR_UNDEF == maxaddr) { HRETURN_ERROR (H5E_ARGS, H5E_BADRANGE, NULL, "bogus maxaddr"); } if (ADDR_OVERFLOW (maxaddr)) { HRETURN_ERROR (H5E_ARGS, H5E_OVERFLOW, NULL, "maxaddr overflow"); } /* Build the open flags */ o_flags = (H5F_ACC_RDWR & flags) ? O_RDWR : O_RDONLY; if (H5F_ACC_TRUNC & flags) o_flags |= O_TRUNC; if (H5F_ACC_CREAT & flags) o_flags |= O_CREAT; if (H5F_ACC_EXCL & flags) o_flags |= O_EXCL; if ((O_RDWR & o_flags) && ! (O_CREAT & o_flags)) { HRETURN_ERROR (H5E_ARGS, H5E_UNSUPPORTED, NULL, "open stream for read/write not supported"); } #ifdef WIN32 if (WSAStartup (MAKEWORD (2, 0), &wsadata)) { HRETURN_ERROR (H5E_IO, H5E_CANTINIT, NULL, "Couldn't start Win32 socket layer"); } #endif fapl = NULL; if (H5P_DEFAULT != fapl_id) { if(TRUE!=H5P_isa_class(fapl_id,H5P_FILE_ACCESS) || NULL == (plist = H5I_object(fapl_id))) HRETURN_ERROR(H5E_ARGS, H5E_BADTYPE, NULL, "not a file access property list"); fapl = H5P_get_driver_info (plist); } if (fapl == NULL) { fapl = &default_fapl; } /* zero out file structure and set file access property list */ HDmemset (&_stream, 0, sizeof (_stream)); _stream.fapl = *fapl; errormsg = NULL; /* if an external socket is provided with the file access property list we use that, otherwise the filename argument is parsed and a socket is opened internally */ if (fapl->do_socket_io) { if (! H5FD_STREAM_ERROR_CHECK (fapl->socket)) { _stream.internal_socket = FALSE; _stream.socket = fapl->socket; } else { _stream.internal_socket = TRUE; _stream.socket = H5FDstream_open_socket (filename, o_flags, &_stream.fapl, &errormsg, &major, &minor); if (_stream.socket != H5FD_STREAM_INVALID_SOCKET) { /* update the port ID in the file access property so that it can be queried via H5P_get_fapl_stream() later on */ H5P_set_driver (plist, H5FD_STREAM, &_stream.fapl); } } } else { _stream.socket = H5FD_STREAM_INVALID_SOCKET; } /* read the data from socket into memory */ if (O_RDONLY == o_flags) { if (errormsg == NULL && fapl->do_socket_io) { #ifdef DEBUG fprintf (stderr, "Stream VFD: reading file from socket\n"); #endif H5FDstream_read_from_socket (&_stream, &errormsg, &major, &minor); } /* Now call the user's broadcast routine if given */ if (fapl->broadcast_fn) { if ((fapl->broadcast_fn) (&_stream.mem, &_stream.eof, fapl->broadcast_arg) < 0) { /* don't override previous error codes */ if (errormsg == NULL) { major = H5E_IO; minor = H5E_READERROR; errormsg = "broadcast error"; } } /* check for filesize of zero bytes */ if (errormsg == NULL && _stream.eof == 0) { major = H5E_IO; minor = H5E_READERROR; errormsg = "zero filesize"; } } /* For files which are read from a socket: the opened socket is not needed anymore */ if (errormsg == NULL) { if (_stream.internal_socket && ! H5FD_STREAM_ERROR_CHECK (_stream.socket)) { H5FD_STREAM_CLOSE_SOCKET (_stream.socket); } _stream.socket = H5FD_STREAM_INVALID_SOCKET; } } /* Create the new file struct */ stream = NULL; if (errormsg == NULL) { stream = (H5FD_stream_t *) H5MM_calloc (sizeof (H5FD_stream_t)); if (stream == NULL) { major = H5E_RESOURCE; minor = H5E_NOSPACE; errormsg = "unable to allocate file struct"; } else { *stream = _stream; } } if (errormsg) { if (_stream.mem) { H5MM_xfree (_stream.mem); } if (_stream.internal_socket && ! H5FD_STREAM_ERROR_CHECK (_stream.socket)) { H5FD_STREAM_CLOSE_SOCKET (_stream.socket); } HRETURN_ERROR (major, minor, NULL, errormsg); } FUNC_LEAVE ((H5FD_t *) stream); } /*------------------------------------------------------------------------- * Function: H5FD_stream_flush * * Purpose: Flushes the file via sockets to any connected clients * if its dirty flag is set. * * Return: Success: 0 * Failure: -1 * * Programmer: Thomas Radke * Tuesday, September 12, 2000 * * Modifications: * *------------------------------------------------------------------------- */ static herr_t H5FD_stream_flush (H5FD_t *_stream, hbool_t UNUSED closing) { H5FD_stream_t *stream = (H5FD_stream_t *) _stream; size_t size; ssize_t bytes_send; int on = 1; unsigned char *ptr; struct sockaddr from; socklen_t fromlen; H5FD_STREAM_SOCKET_TYPE sock; FUNC_ENTER (H5FD_stream_flush, FAIL); /* Write to backing store */ if (stream->dirty && ! H5FD_STREAM_ERROR_CHECK (stream->socket)) { #ifdef DEBUG fprintf (stderr, "Stream VFD: accepting client connections\n"); #endif fromlen = sizeof (from); while (! H5FD_STREAM_ERROR_CHECK (sock = accept (stream->socket, &from, &fromlen))) { if (H5FD_STREAM_IOCTL_SOCKET (sock, FIONBIO, &on) < 0) { H5FD_STREAM_CLOSE_SOCKET (sock); continue; /* continue the loop for other clients to connect */ } size = stream->eof; ptr = stream->mem; while (size) { bytes_send = send (sock, ptr, size, 0); if (bytes_send < 0) { if (EINTR == errno || EAGAIN == errno || EWOULDBLOCK == errno) { continue; } /* continue the outermost loop for other clients to connect */ break; } ptr += bytes_send; size -= bytes_send; #ifdef DEBUG fprintf (stderr, "Stream VFD: wrote %d bytes to socket, %d in total, " "%d left\n", bytes_send, (int) (ptr - stream->mem), size); #endif } H5FD_STREAM_CLOSE_SOCKET (sock); } stream->dirty = FALSE; } FUNC_LEAVE (SUCCEED); } /*------------------------------------------------------------------------- * Function: H5FD_stream_close * * Purpose: Closes the file. * * Return: Success: 0 * Failure: -1 * * Programmer: Thomas Radke * Tuesday, September 12, 2000 * * Modifications: * *------------------------------------------------------------------------- */ static herr_t H5FD_stream_close (H5FD_t *_stream) { H5FD_stream_t *stream = (H5FD_stream_t *) _stream; FUNC_ENTER (H5FD_stream_close, FAIL); /* Flush */ if (H5FD_stream_flush (_stream, TRUE) != SUCCEED) { HRETURN_ERROR (H5E_FILE, H5E_CANTFLUSH, FAIL, "unable to flush file"); } /* Release resources */ if (! H5FD_STREAM_ERROR_CHECK (stream->socket) && stream->internal_socket) { H5FD_STREAM_CLOSE_SOCKET (stream->socket); } if (stream->mem) { H5MM_xfree (stream->mem); } HDmemset (stream, 0, sizeof (H5FD_stream_t)); H5MM_xfree (stream); FUNC_LEAVE (0); } /*------------------------------------------------------------------------- * Function: H5FD_stream_query * * Purpose: Set the flags that this VFL driver is capable of supporting. * (listed in H5FDpublic.h) * * Return: Success: non-negative * * Failure: negative * * Programmer: Quincey Koziol * Tuesday, September 26, 2000 * * Modifications: * *------------------------------------------------------------------------- */ static herr_t H5FD_stream_query(const H5FD_t * UNUSED _f, unsigned long *flags/*out*/) { FUNC_ENTER (H5FD_stream_query, SUCCEED); /* Set the VFL feature flags that this driver supports */ if (flags) { *flags = 0; /* OK to perform data sieving for faster raw data reads & writes */ *flags |= H5FD_FEAT_DATA_SIEVE; } FUNC_LEAVE (SUCCEED); } /*------------------------------------------------------------------------- * Function: H5FD_stream_get_eoa * * Purpose: Gets the end-of-address marker for the file. The EOA marker * is the first address past the last byte allocated in the * format address space. * * Return: Success: The end-of-address marker. * Failure: HADDR_UNDEF * * Programmer: Thomas Radke * Tuesday, September 12, 2000 * * Modifications: * *------------------------------------------------------------------------- */ static haddr_t H5FD_stream_get_eoa (H5FD_t *_stream) { H5FD_stream_t *stream = (H5FD_stream_t *) _stream; FUNC_ENTER (H5FD_stream_get_eoa, HADDR_UNDEF); FUNC_LEAVE (stream->eoa); } /*------------------------------------------------------------------------- * Function: H5FD_stream_set_eoa * * Purpose: Set the end-of-address marker for the file. This function is * called shortly after an existing HDF5 file is opened in order * to tell the driver where the end of the HDF5 data is located. * * Return: Success: 0 * Failure: -1 * * Programmer: Thomas Radke * Tuesday, September 12, 2000 * * Modifications: * *------------------------------------------------------------------------- */ static herr_t H5FD_stream_set_eoa (H5FD_t *_stream, haddr_t addr) { H5FD_stream_t *stream = (H5FD_stream_t *) _stream; FUNC_ENTER (H5FD_stream_set_eoa, FAIL); if (ADDR_OVERFLOW (addr)) { HRETURN_ERROR (H5E_ARGS, H5E_OVERFLOW, FAIL, "address overflow"); } stream->eoa = addr; FUNC_LEAVE (0); } /*------------------------------------------------------------------------- * Function: H5FD_stream_get_eof * * Purpose: Returns the end-of-file marker, which is the greater of * either the size of the underlying memory or the HDF5 * end-of-address markers. * * Return: Success: End of file address, the first address past * the end of the "file", either the memory * or the HDF5 file. * Failure: HADDR_UNDEF * * Programmer: Thomas Radke * Tuesday, September 12, 2000 * * Modifications: * *------------------------------------------------------------------------- */ static haddr_t H5FD_stream_get_eof (H5FD_t *_stream) { H5FD_stream_t *stream = (H5FD_stream_t *) _stream; FUNC_ENTER (H5FD_stream_get_eof, HADDR_UNDEF); FUNC_LEAVE (MAX (stream->eof, stream->eoa)); } /*------------------------------------------------------------------------- * Function: H5FD_stream_read * * Purpose: Reads SIZE bytes of data from FILE beginning at address ADDR * into buffer BUF according to data transfer properties in * DXPL_ID. * * Return: Success: 0 * Result is stored in caller-supplied buffer BUF * Failure: -1 * Contents of buffer BUF are undefined * * Programmer: Thomas Radke * Tuesday, September 12, 2000 * * Modifications: * *------------------------------------------------------------------------- */ static herr_t H5FD_stream_read (H5FD_t *_stream, H5FD_mem_t UNUSED type, hid_t UNUSED dxpl_id, haddr_t addr, size_t size, void *buf /*out*/) { H5FD_stream_t *stream = (H5FD_stream_t *) _stream; size_t nbytes; FUNC_ENTER (H5FD_stream_read, FAIL); assert (stream && stream->pub.cls); assert (buf); /* Check for overflow conditions */ if (HADDR_UNDEF == addr) { HRETURN_ERROR (H5E_IO, H5E_OVERFLOW, FAIL, "file address overflowed"); } if (REGION_OVERFLOW (addr, size)) { HRETURN_ERROR (H5E_IO, H5E_OVERFLOW, FAIL, "file address overflowed"); } if (addr + size > stream->eoa) { HRETURN_ERROR (H5E_IO, H5E_OVERFLOW, FAIL, "file address overflowed"); } /* Read the part which is before the EOF marker */ if (addr < stream->eof) { nbytes = MIN (size, stream->eof - addr); HDmemcpy (buf, stream->mem + addr, nbytes); size -= nbytes; addr += nbytes; buf = (char *) buf + nbytes; } /* Read zeros for the part which is after the EOF markers */ if (size > 0) { HDmemset (buf, 0, size); } FUNC_LEAVE (SUCCEED); } /*------------------------------------------------------------------------- * Function: H5FD_stream_write * * Purpose: Writes SIZE bytes of data to FILE beginning at address ADDR * from buffer BUF according to data transfer properties in * DXPL_ID. * * Return: Success: Zero * Failure: -1 * * Programmer: Thomas Radke * Tuesday, September 12, 2000 * * Modifications: * *------------------------------------------------------------------------- */ static herr_t H5FD_stream_write (H5FD_t *_stream, H5FD_mem_t UNUSED type, hid_t UNUSED dxpl_id, haddr_t addr, size_t size, const void *buf) { H5FD_stream_t *stream = (H5FD_stream_t *) _stream; FUNC_ENTER (H5FD_stream_write, FAIL); assert (stream && stream->pub.cls); assert (buf); /* Check for overflow conditions */ if (REGION_OVERFLOW (addr, size)) { HRETURN_ERROR (H5E_IO, H5E_OVERFLOW, FAIL, "file address overflowed"); } if (addr + size > stream->eoa) { HRETURN_ERROR (H5E_IO, H5E_OVERFLOW, FAIL, "file address overflowed"); } /* * Allocate more memory if necessary, careful of overflow. Also, if the * allocation fails then the file should remain in a usable state. Be * careful of non-Posix realloc() that doesn't understand what to do when * the first argument is null. */ if (addr + size > stream->eof) { unsigned char *x; haddr_t new_eof = stream->fapl.increment * ((addr+size) / stream->fapl.increment); if ((addr+size) % stream->fapl.increment) { new_eof += stream->fapl.increment; } if (stream->mem == NULL) { x = H5MM_malloc ((size_t) new_eof); } else { x = H5MM_realloc (stream->mem, (size_t) new_eof); } if (x == NULL) { HRETURN_ERROR (H5E_RESOURCE, H5E_NOSPACE, FAIL, "unable to allocate memory block"); } stream->mem = x; stream->eof = new_eof; } /* Write from BUF to memory */ HDmemcpy (stream->mem + addr, buf, size); stream->dirty = TRUE; FUNC_LEAVE (SUCCEED); } #endif /* H5_HAVE_STREAM */