diff options
-rw-r--r-- | src/H5FDstream.c | 984 | ||||
-rw-r--r-- | src/H5FDstream.h | 53 |
2 files changed, 1037 insertions, 0 deletions
diff --git a/src/H5FDstream.c b/src/H5FDstream.c new file mode 100644 index 0000000..cd0e6e6 --- /dev/null +++ b/src/H5FDstream.c @@ -0,0 +1,984 @@ +/* + * Copyright © 2000 The author. + * The author prefers this code not be used for military purposes. + * + * + * Author: Thomas Radke <tradke@aei-potsdam.mpg.de> + * Tuesday, September 12, 2000 + * + * Purpose: This code provides the Stream Virtual File Driver. + * It is very much based on the core VFD which keeps an + * entire HDF5 data file to be processed in main memory. + * In addition to that, the memory image of the file is + * read from/written to a socket during an open/flush operation. + */ + +#include <netdb.h> /* gethostbyname */ +#include <sys/types.h> /* socket stuff */ +#include <sys/socket.h> /* socket stuff */ +#include <netinet/in.h> /* socket stuff */ +#include <netinet/tcp.h> /* socket stuff */ + +#include <H5Eprivate.h> /* error handling */ +#include <H5FDpublic.h> /* VFD structures */ +#include <H5MMprivate.h> /* memory allocation */ +#include <H5Ppublic.h> /* files */ +#include <H5FDstream.h> /* Stream VFD header */ + + +/* Only build this driver if it was configured with --with-Stream-VFD */ +#ifdef H5_HAVE_STREAM + +/* Some useful macros */ +#undef MAX +#define MAX(x,y) ((x) > (y) ? (x) : (y)) +#undef MIN +#define MIN(x,y) ((x) < (y) ? (x) : (y)) + +/* Uncomment this to switch on debugging output */ +/* #define DEBUG 1 */ + +/* The driver identification number, initialized at runtime */ +static hid_t H5FD_STREAM_g = 0; + +/* + * The description of a file belonging to this driver. The `eoa' and `eof' + * determine the amount of hdf5 address space in use and the high-water mark + * of the file (the current size of the underlying memory). + */ +typedef struct H5FD_stream_t +{ + H5FD_t pub; /* public stuff, must be first */ + H5FD_stream_fapl_t fapl; /* file access property list */ + unsigned char *mem; /* the underlying memory */ + haddr_t eoa; /* end of allocated region */ + haddr_t eof; /* current allocated size */ + int socket; /* socket to write / read from */ + hbool_t dirty; /* flag indicating unflushed data */ + hbool_t internal_socket; /* flag indicating an internal socket */ +} H5FD_stream_t; + +/* Allocate memory in multiples of this size (in bytes) by default */ +#define H5FD_STREAM_INCREMENT 8192 + +/* default backlog argument for listen call */ +#define H5FD_STREAM_BACKLOG 1 + +/* default file access property list */ +static const H5FD_stream_fapl_t default_fapl = +{ + H5FD_STREAM_INCREMENT, /* address space allocation blocksize */ + -1, /* no external socket descriptor */ + TRUE, /* enable I/O on socket */ + H5FD_STREAM_BACKLOG, /* default backlog for listen(2) */ + NULL, /* do not broadcast received files */ + NULL /* argument to READ broadcast routine */ +}; + +/* + * These macros check for overflow of various quantities. These macros + * assume that file_offset_t is signed and haddr_t and size_t are unsigned. + * + * ADDR_OVERFLOW: Checks whether a file address of type `haddr_t' + * is too large to be represented by the second argument + * of the file seek function. + * + * SIZE_OVERFLOW: Checks whether a buffer size of type `hsize_t' is too + * large to be represented by the `size_t' type. + * + * REGION_OVERFLOW: Checks whether an address and size pair describe data + * which can be addressed entirely in memory. + */ +#ifdef HAVE_LSEEK64 +# define file_offset_t off64_t +#else +# define file_offset_t off_t +#endif +#define MAXADDR (((haddr_t)1<<(8*sizeof(file_offset_t)-1))-1) +#define ADDR_OVERFLOW(A) (HADDR_UNDEF==(A) || \ + ((A) & ~(haddr_t)MAXADDR)) +#define SIZE_OVERFLOW(Z) ((Z) & ~(hsize_t)MAXADDR) +#define REGION_OVERFLOW(A,Z) (ADDR_OVERFLOW(A) || SIZE_OVERFLOW(Z) || \ + HADDR_UNDEF==(A)+(Z) || \ + (size_t)((A)+(Z))<(size_t)(A)) + +/* Function prototypes */ +static void *H5FD_stream_fapl_get (H5FD_t *_stream); +static H5FD_t *H5FD_stream_open (const char *name, unsigned flags, + hid_t fapl_id, haddr_t maxaddr); +static herr_t H5FD_stream_flush (H5FD_t *_stream); +static herr_t H5FD_stream_close (H5FD_t *_stream); +static haddr_t H5FD_stream_get_eoa (H5FD_t *_stream); +static herr_t H5FD_stream_set_eoa (H5FD_t *_stream, haddr_t addr); +static haddr_t H5FD_stream_get_eof (H5FD_t *_stream); +static herr_t H5FD_stream_read (H5FD_t *_stream, hid_t fapl_id, haddr_t addr, + hsize_t size, void *buf); +static herr_t H5FD_stream_write (H5FD_t *_stream, H5FD_mem_t type, + hid_t fapl_id, haddr_t addr, + hsize_t size, const void *buf); + +/* The Stream VFD's class information structure */ +static const H5FD_class_t H5FD_stream_g = +{ + "stream", /* name */ + MAXADDR, /* maxaddr */ + NULL, /* sb_size */ + NULL, /* sb_encode */ + NULL, /* sb_decode */ + sizeof (H5FD_stream_fapl_t), /* fapl_size */ + H5FD_stream_fapl_get, /* fapl_get */ + NULL, /* fapl_copy */ + NULL, /* fapl_free */ + 0, /* dxpl_size */ + NULL, /* dxpl_copy */ + NULL, /* dxpl_free */ + H5FD_stream_open, /* open */ + H5FD_stream_close, /* close */ + NULL, /* query */ + NULL, /* cmp */ + NULL, /* alloc */ + NULL, /* free */ + H5FD_stream_get_eoa, /* get_eoa */ + H5FD_stream_set_eoa, /* set_eoa */ + H5FD_stream_get_eof, /* get_eof */ + H5FD_stream_read, /* read */ + H5FD_stream_write, /* write */ + H5FD_stream_flush, /* flush */ + H5FD_FLMAP_SINGLE, /* fl_map */ +}; + +/* Interface initialization */ +#define INTERFACE_INIT H5FD_stream_init +static intn interface_initialize_g = 0; + + +/*------------------------------------------------------------------------- + * Function: H5FD_stream_init + * + * Purpose: Initialize this driver by registering it with the library. + * + * Return: Success: The driver ID for the Stream driver. + * Failure: Negative. + * + * Programmer: Thomas Radke + * Tuesday, September 12, 2000 + * + * Modifications: + * + *------------------------------------------------------------------------- + */ +hid_t H5FD_stream_init (void) +{ + FUNC_ENTER (H5FD_stream_init, FAIL); + + if (H5I_VFL != H5Iget_type (H5FD_STREAM_g)) + H5FD_STREAM_g = H5FDregister (&H5FD_stream_g); + + FUNC_LEAVE (H5FD_STREAM_g); +} + + +/*------------------------------------------------------------------------- + * Function: H5Pset_fapl_stream + * + * Purpose: Modify the file access property list to use the Stream + * driver defined in this source file. The INCREMENT specifies + * how much to grow the memory each time we need more. + * If a valid SOCKET argument is given this will be used + * by the driver instead of parsing the 'hostname:port' filename + * and opening a socket internally. + * + * Return: Success: Non-negative + * Failure: Negative + * + * Programmer: Thomas Radke + * Tuesday, September 12, 2000 + * + * Modifications: + * + *------------------------------------------------------------------------- + */ +herr_t H5Pset_fapl_stream (hid_t fapl_id, H5FD_stream_fapl_t *fapl) +{ + herr_t result; + H5FD_stream_fapl_t user_fapl; + + + FUNC_ENTER (H5FD_set_fapl_stream, FAIL); + H5TRACE2 ("e", "ix", fapl_id, fapl); + + if (H5P_FILE_ACCESS != H5Pget_class (fapl_id)) + HRETURN_ERROR (H5E_PLIST, H5E_BADTYPE, FAIL, "not a fapl"); + + if (fapl) + { + if (! fapl->do_socket_io && fapl->broadcast_fn == NULL) + HRETURN_ERROR (H5E_ARGS, H5E_BADVALUE, FAIL, + "read broadcast function pointer is NULL"); + + user_fapl = *fapl; + if (fapl->increment == 0) + { + user_fapl.increment = H5FD_STREAM_INCREMENT; + } + result = H5Pset_driver (fapl_id, H5FD_STREAM, &user_fapl); + } + else + { + result = H5Pset_driver (fapl_id, H5FD_STREAM, &default_fapl); + } + + FUNC_LEAVE (result); +} + + +/*------------------------------------------------------------------------- + * Function: H5Pget_fapl_stream + * + * Purpose: Queries properties set by the H5Pset_fapl_stream() function. + * + * Return: Success: Non-negative + * Failure: Negative + * + * Programmer: Thomas Radke + * Tuesday, September 12, 2000 + * + * Modifications: + * + *------------------------------------------------------------------------- + */ +herr_t H5Pget_fapl_stream(hid_t fapl_id, H5FD_stream_fapl_t *fapl /* out */) +{ + H5FD_stream_fapl_t *this_fapl; + + + FUNC_ENTER (H5Pget_fapl_stream, FAIL); + H5TRACE2("e","ix",fapl_id,fapl); + + if (H5P_FILE_ACCESS != H5Pget_class (fapl_id)) + HRETURN_ERROR (H5E_PLIST, H5E_BADTYPE, FAIL, "not a fapl"); + if (H5FD_STREAM != H5Pget_driver (fapl_id)) + HRETURN_ERROR (H5E_PLIST, H5E_BADVALUE, FAIL, "incorrect VFL driver"); + if (NULL == (this_fapl = H5Pget_driver_info (fapl_id))) + HRETURN_ERROR (H5E_PLIST, H5E_BADVALUE, FAIL, "bad VFL driver info"); + + if (fapl) + *fapl = *this_fapl; + + FUNC_LEAVE (SUCCEED); +} + + +/*------------------------------------------------------------------------- + * Function: H5FD_stream_fapl_get + * + * Purpose: Returns a copy of the file access properties + * + * Return: Success: Ptr to new file access properties + * Failure: NULL + * + * Programmer: Thomas Radke + * Tuesday, September 12, 2000 + * + * Modifications: + * + *------------------------------------------------------------------------- + */ +static void *H5FD_stream_fapl_get (H5FD_t *_stream) +{ + H5FD_stream_t *stream = (H5FD_stream_t *) _stream; + H5FD_stream_fapl_t *fapl; + + + FUNC_ENTER (H5FD_stream_fapl_get, NULL); + + if ((fapl = H5MM_calloc (sizeof (H5FD_stream_fapl_t))) == NULL) + HRETURN_ERROR (H5E_RESOURCE, H5E_NOSPACE, NULL, "memory allocation failed"); + + *fapl = stream->fapl; + + FUNC_LEAVE (fapl); +} + + +static int H5FDstream_open_socket(const char *filename, int o_flags, + unsigned int backlog, + const char **errormsg, + H5E_major_t *major, H5E_minor_t *minor) +{ + struct sockaddr_in sin; + struct hostent *he; + unsigned short int port; + int sock; + char *hostname; + const char *separator, *tmp; + const int on = 1; + + + sock = -1; + *errormsg = NULL; + + /* Parse "hostname:port" from filename argument */ + for (separator = filename; *separator != ':' && *separator; separator++); + if (separator == filename || !*separator) + { + *errormsg = "invalid host address"; + } + else + { + tmp = separator; + if (! tmp [1]) + { + *errormsg = "no port number"; + } + while (*errormsg == NULL && *++tmp) + { + if (! isdigit (*tmp)) + { + *errormsg = "invalid port number"; + } + } + } + + /* Return if parsing the filename failed */ + if (*errormsg) + { + *major = H5E_ARGS; *minor = H5E_BADVALUE; + return (sock); + } + + hostname = (char *) H5MM_malloc (separator - filename + 1); + + /* Return if out of memory */ + if (hostname == NULL) + { + *major = H5E_RESOURCE; *minor = H5E_NOSPACE; + *errormsg = "memory allocation failed"; + return (sock); + } + + strncpy (hostname, filename, separator - filename); + hostname [separator - filename] = 0; + port = atoi (separator + 1); + + memset (&sin, 0, sizeof (sin)); + sin.sin_family = AF_INET; + sin.sin_port = htons (port); + + if (! (he = gethostbyname (hostname))) + *errormsg = "unable to get host address"; + else if ((sock = socket (AF_INET, SOCK_STREAM, 0)) < 0) + *errormsg = "unable to open socket"; + + if (*errormsg == NULL) + { + if (O_RDONLY == o_flags) + { + memcpy (&sin.sin_addr, he->h_addr, he->h_length); +#ifdef DEBUG + fprintf (stderr, "Stream VFD: connecting to host '%s' port %d\n", + hostname, port); +#endif + if (connect (sock, (struct sockaddr *) &sin, sizeof (sin)) < 0) + *errormsg = "unable to connect"; + } + else + { + sin.sin_addr.s_addr = INADDR_ANY; + if (fcntl (sock, F_SETFL, O_NONBLOCK) < 0) + *errormsg = "unable to set non-blocking mode for socket"; + else if (setsockopt (sock, IPPROTO_TCP, TCP_NODELAY, &on, sizeof(on)) < 0) + *errormsg = "unable to set socket option TCP_NODELAY"; + else if (setsockopt (sock, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)) < 0) + *errormsg = "unable to set socket option SO_REUSEADDR"; + else if (bind (sock, (struct sockaddr *) &sin, sizeof (sin)) < 0) + *errormsg = "unable to bind socket"; + else if (listen (sock, backlog) < 0) + *errormsg = "unable to listen on socket"; + } + } + + H5MM_xfree (hostname); + + /* Return if opening the socket failed */ + if (*errormsg) + { + if (sock >= 0) + { + close (sock); + sock = -1; + } + *major = H5E_FILE; *minor = H5E_CANTOPENFILE; + } + + return (sock); +} + + +static void H5FDstream_read_from_socket (H5FD_stream_t *stream, + const char **errormsg, + H5E_major_t *major, H5E_minor_t *minor) +{ + ssize_t size; + size_t max_size = 0; + unsigned char *ptr; + + + *errormsg = NULL; + stream->eof = 0; + stream->mem = NULL; + + while (1) + { + if (max_size <= 0) + { + /* + * Allocate initial buffer as increment + 1 + * to prevent unnecessary reallocation + * if increment is exactly a multiple of the filesize + */ + max_size = stream->fapl.increment; + if (! stream->mem) + max_size++; + ptr = H5MM_realloc (stream->mem, stream->eof + max_size); + if (! ptr) + { + *major = H5E_RESOURCE; *minor = H5E_NOSPACE; + *errormsg = "unable to allocate file space buffer"; + return; + } + stream->mem = ptr; + ptr += stream->eof; + } + + /* now receive the next chunk of data */ + size = read (stream->socket, ptr, max_size); + if (size < 0 && (EINTR == errno || EAGAIN == errno)) + continue; + if (size < 0) + { + *major = H5E_IO; *minor = H5E_READERROR; + *errormsg = "error reading from file from socket"; + return; + } + if (! size) + break; + max_size -= (size_t) size; + stream->eof += (haddr_t) size; + ptr += size; +#ifdef DEBUG + fprintf (stderr, "Stream VFD: read %d bytes (%d total) from socket\n", + (int) size, (int) stream->eof); +#endif + } + +#ifdef DEBUG + fprintf (stderr, "Stream VFD: read total of %d bytes from socket\n", + (int) stream->eof); +#endif +} + + +/*------------------------------------------------------------------------- + * Function: H5FD_stream_open + * + * Purpose: Opens an HDF5 file in memory. + * + * Return: Success: A pointer to a new file data structure. The + * public fields will be initialized by the + * caller, which is always H5FD_open(). + * Failure: NULL + * + * Programmer: Thomas Radke + * Tuesday, September 12, 2000 + * + * Modifications: + * + *------------------------------------------------------------------------- + */ +static H5FD_t *H5FD_stream_open (const char *filename, + unsigned flags, + hid_t fapl_id, + haddr_t maxaddr) +{ + H5FD_stream_t _stream, *stream; + const H5FD_stream_fapl_t *fapl; + int o_flags; + H5E_major_t major; + H5E_minor_t minor; + const char *errormsg; + + + FUNC_ENTER (H5FD_stream_open, NULL); + + /* Check arguments */ + if (filename == NULL|| *filename == '\0') + HRETURN_ERROR (H5E_ARGS, H5E_BADVALUE, NULL,"invalid file name"); + if (maxaddr == 0 || HADDR_UNDEF == maxaddr) + HRETURN_ERROR (H5E_ARGS, H5E_BADRANGE, NULL, "bogus maxaddr"); + if (ADDR_OVERFLOW (maxaddr)) + HRETURN_ERROR (H5E_ARGS, H5E_OVERFLOW, NULL, "maxaddr overflow"); + + /* Build the open flags */ + o_flags = (H5F_ACC_RDWR & flags) ? O_RDWR : O_RDONLY; + if (H5F_ACC_TRUNC & flags) o_flags |= O_TRUNC; + if (H5F_ACC_CREAT & flags) o_flags |= O_CREAT; + if (H5F_ACC_EXCL & flags) o_flags |= O_EXCL; + + if ((O_RDWR & o_flags) && ! (O_CREAT & o_flags)) + HRETURN_ERROR (H5E_ARGS, H5E_UNSUPPORTED, NULL, + "open stream for read/write not supported"); + + if (H5P_DEFAULT != fapl_id) + fapl = H5Pget_driver_info (fapl_id); + if (fapl == NULL) + fapl = &default_fapl; + + /* zero out file structure and set file access property list */ + memset (&_stream, 0, sizeof (_stream)); + _stream.fapl = *fapl; + + errormsg = NULL; + + /* if an external socket is provided with the file access property list + we use that, otherwise the filename argument is parsed and a socket + is opened internally */ + if (fapl->do_socket_io) + { + if (fapl->socket >= 0) + { + _stream.internal_socket = FALSE; + _stream.socket = fapl->socket; + } + else + { + _stream.internal_socket = TRUE; + _stream.socket = H5FDstream_open_socket (filename, o_flags, fapl->backlog, + &errormsg, &major, &minor); + } + } + else + { + _stream.socket = -1; + } + + /* read the data from socket into memory */ + if (O_RDONLY == o_flags) + { + if (errormsg == NULL && fapl->do_socket_io) + { +#ifdef DEBUG + fprintf (stderr, "Stream VFD: reading file from socket\n"); +#endif + H5FDstream_read_from_socket (&_stream, &errormsg, &major, &minor); + } + + /* Now call the user's broadcast routine if given */ + if (fapl->broadcast_fn) + { + if ((fapl->broadcast_fn) (&_stream.mem, &_stream.eof, + fapl->broadcast_arg) < 0) + { + /* don't override previous error codes */ + if (errormsg == NULL) + { + major = H5E_IO; minor = H5E_READERROR; + errormsg = "broadcast error"; + } + } + + /* check for filesize of zero bytes */ + if (errormsg == NULL && _stream.eof == 0) + { + major = H5E_IO; minor = H5E_READERROR; + errormsg = "zero filesize"; + } + } + + /* For files which are read from a socket: + the opened socket is not needed anymore */ + if (errormsg == NULL) + { + if (_stream.internal_socket && _stream.socket >= 0) + close (_stream.socket); + _stream.socket = -1; + } + } + + /* Create the new file struct */ + stream = NULL; + if (errormsg == NULL) + { + stream = (H5FD_stream_t *) H5MM_calloc (sizeof (H5FD_stream_t)); + if (stream == NULL) + { + major = H5E_RESOURCE; minor = H5E_NOSPACE; + errormsg = "unable to allocate file struct"; + } + else + { + *stream = _stream; + } + } + + if (errormsg) + { + if (_stream.mem) + H5MM_xfree (_stream.mem); + if (_stream.internal_socket && _stream.socket >= 0) + close (_stream.socket); + HRETURN_ERROR (major, minor, NULL, errormsg); + } + + FUNC_LEAVE ((H5FD_t *) stream); +} + + +/*------------------------------------------------------------------------- + * Function: H5FD_stream_flush + * + * Purpose: Flushes the file via sockets to any connected clients + * if its dirty flag is set. + * + * Return: Success: 0 + * Failure: -1 + * + * Programmer: Thomas Radke + * Tuesday, September 12, 2000 + * + * Modifications: + * + *------------------------------------------------------------------------- + */ +static herr_t H5FD_stream_flush (H5FD_t *_stream) +{ + H5FD_stream_t *stream = (H5FD_stream_t *) _stream; + haddr_t size; + unsigned char *ptr; + struct sockaddr from; +#if !(defined(linux) || defined(sun)) + typedef int socklen_t; +#endif + socklen_t fromlen; + int sock; + + + FUNC_ENTER (H5FD_stream_flush, FAIL); + + /* Write to backing store */ + if (stream->dirty && stream->socket >= 0) + { +#ifdef DEBUG + fprintf (stderr, "Stream VFD: accepting client connections\n"); +#endif + fromlen = sizeof (from); + while ((sock = accept (stream->socket, &from, &fromlen)) >= 0) + { + if (fcntl (sock, F_SETFL, O_NONBLOCK) < 0) + { + close (sock); + continue; /* continue the loop for other clients' requests */ + } + + size = stream->eof; + ptr = stream->mem; + + while (size) + { + ssize_t n = write (sock, ptr, size); + if (n < 0 && (EINTR == errno || EAGAIN == errno)) + continue; + if (n < 0) + { + close (sock); + /* FIXME: continue the loop for other clients here */ + HRETURN_ERROR (H5E_IO, H5E_WRITEERROR, FAIL, + "error writing to socket"); + } + ptr += (size_t) n; + size -= (size_t) n; +#ifdef DEBUG + fprintf (stderr, "Stream VFD: wrote %d bytes to socket, %d in total, " + "%d left\n", (int) n, ptr - stream->mem, (int) size); +#endif + } + close (sock); + } + stream->dirty = FALSE; + } + + FUNC_LEAVE (SUCCEED); +} + + +/*------------------------------------------------------------------------- + * Function: H5FD_stream_close + * + * Purpose: Closes the file. + * + * Return: Success: 0 + * Failure: -1 + * + * Programmer: Thomas Radke + * Tuesday, September 12, 2000 + * + * Modifications: + * + *------------------------------------------------------------------------- + */ +static herr_t H5FD_stream_close (H5FD_t *_stream) +{ + H5FD_stream_t *stream = (H5FD_stream_t *) _stream; + + + FUNC_ENTER (H5FD_stream_close, FAIL); + + /* Flush */ + if (H5FD_stream_flush (_stream) < 0) + HRETURN_ERROR (H5E_FILE, H5E_CANTFLUSH, FAIL, "unable to flush file"); + + /* Release resources */ + if (stream->socket >= 0 && stream->internal_socket) + { + close (stream->socket); + } + if (stream->mem) + { + H5MM_xfree (stream->mem); + } + memset (stream, 0, sizeof (H5FD_stream_t)); + + FUNC_LEAVE (0); +} + + +/*------------------------------------------------------------------------- + * Function: H5FD_stream_get_eoa + * + * Purpose: Gets the end-of-address marker for the file. The EOA marker + * is the first address past the last byte allocated in the + * format address space. + * + * Return: Success: The end-of-address marker. + * Failure: HADDR_UNDEF + * + * Programmer: Thomas Radke + * Tuesday, September 12, 2000 + * + * Modifications: + * + *------------------------------------------------------------------------- + */ +static haddr_t H5FD_stream_get_eoa (H5FD_t *_stream) +{ + H5FD_stream_t *stream = (H5FD_stream_t *) _stream; + + + FUNC_ENTER (H5FD_stream_get_eoa, HADDR_UNDEF); + + FUNC_LEAVE (stream->eoa); +} + + +/*------------------------------------------------------------------------- + * Function: H5FD_stream_set_eoa + * + * Purpose: Set the end-of-address marker for the file. This function is + * called shortly after an existing HDF5 file is opened in order + * to tell the driver where the end of the HDF5 data is located. + * + * Return: Success: 0 + * Failure: -1 + * + * Programmer: Thomas Radke + * Tuesday, September 12, 2000 + * + * Modifications: + * + *------------------------------------------------------------------------- + */ +static herr_t H5FD_stream_set_eoa (H5FD_t *_stream, + haddr_t addr) +{ + H5FD_stream_t *stream = (H5FD_stream_t *) _stream; + + + FUNC_ENTER (H5FD_stream_set_eoa, FAIL); + + if (ADDR_OVERFLOW (addr)) + HRETURN_ERROR (H5E_ARGS, H5E_OVERFLOW, FAIL, "address overflow"); + + stream->eoa = addr; + + FUNC_LEAVE (0); +} + + +/*------------------------------------------------------------------------- + * Function: H5FD_stream_get_eof + * + * Purpose: Returns the end-of-file marker, which is the greater of + * either the size of the underlying memory or the HDF5 + * end-of-address markers. + * + * Return: Success: End of file address, the first address past + * the end of the "file", either the memory + * or the HDF5 file. + * Failure: HADDR_UNDEF + * + * Programmer: Thomas Radke + * Tuesday, September 12, 2000 + * + * Modifications: + * + *------------------------------------------------------------------------- + */ +static haddr_t H5FD_stream_get_eof (H5FD_t *_stream) +{ + H5FD_stream_t *stream = (H5FD_stream_t *) _stream; + + + FUNC_ENTER (H5FD_stream_get_eof, HADDR_UNDEF); + + FUNC_LEAVE (MAX (stream->eof, stream->eoa)); +} + + +/*------------------------------------------------------------------------- + * Function: H5FD_stream_read + * + * Purpose: Reads SIZE bytes of data from FILE beginning at address ADDR + * into buffer BUF according to data transfer properties in + * DXPL_ID. + * + * Return: Success: 0 + * Result is stored in caller-supplied buffer BUF + * Failure: -1 + * Contents of buffer BUF are undefined + * + * Programmer: Thomas Radke + * Tuesday, September 12, 2000 + * + * Modifications: + * + *------------------------------------------------------------------------- + */ +static herr_t H5FD_stream_read (H5FD_t *_stream, + hid_t UNUSED dxpl_id, + haddr_t addr, + hsize_t size, + void *buf /*out*/) +{ + H5FD_stream_t *stream = (H5FD_stream_t *) _stream; + ssize_t nbytes; + + + FUNC_ENTER (H5FD_stream_read, FAIL); + + assert (stream && stream->pub.cls); + assert (buf); + + /* Check for overflow conditions */ + if (HADDR_UNDEF == addr) + HRETURN_ERROR (H5E_IO, H5E_OVERFLOW, FAIL, "file address overflowed"); + if (REGION_OVERFLOW (addr, size)) + HRETURN_ERROR (H5E_IO, H5E_OVERFLOW, FAIL, "file address overflowed"); + if (addr + size > stream->eoa) + HRETURN_ERROR (H5E_IO, H5E_OVERFLOW, FAIL, "file address overflowed"); + + /* Read the part which is before the EOF marker */ + if (addr < stream->eof) + { + nbytes = MIN (size, stream->eof - addr); + memcpy (buf, stream->mem + addr, nbytes); + size -= nbytes; + addr += nbytes; + buf = (char *) buf + nbytes; + } + + /* Read zeros for the part which is after the EOF markers */ + if (size > 0) + memset (buf, 0, size); + + FUNC_LEAVE (SUCCEED); +} + + +/*------------------------------------------------------------------------- + * Function: H5FD_stream_write + * + * Purpose: Writes SIZE bytes of data to FILE beginning at address ADDR + * from buffer BUF according to data transfer properties in + * DXPL_ID. + * + * Return: Success: Zero + * Failure: -1 + * + * Programmer: Thomas Radke + * Tuesday, September 12, 2000 + * + * Modifications: + * + *------------------------------------------------------------------------- + */ +static herr_t H5FD_stream_write (H5FD_t *_stream, + H5FD_mem_t UNUSED type, + hid_t UNUSED dxpl_id, + haddr_t addr, + hsize_t size, + const void *buf) +{ + H5FD_stream_t *stream = (H5FD_stream_t *) _stream; + + + FUNC_ENTER (H5FD_stream_write, FAIL); + + assert (stream && stream->pub.cls); + assert (buf); + + /* Check for overflow conditions */ + if (REGION_OVERFLOW (addr, size)) + HRETURN_ERROR (H5E_IO, H5E_OVERFLOW, FAIL, "file address overflowed"); + if (addr + size > stream->eoa) + HRETURN_ERROR (H5E_IO, H5E_OVERFLOW, FAIL, "file address overflowed"); + + /* + * Allocate more memory if necessary, careful of overflow. Also, if the + * allocation fails then the file should remain in a usable state. Be + * careful of non-Posix realloc() that doesn't understand what to do when + * the first argument is null. + */ + if (addr + size > stream->eof) + { + unsigned char *x; + haddr_t new_eof = stream->fapl.increment * + ((addr+size) / stream->fapl.increment); + + + if ((addr+size) % stream->fapl.increment) + { + new_eof += stream->fapl.increment; + } + if (stream->mem == NULL) + { + x = H5MM_malloc (new_eof); + } + else + { + x = H5MM_realloc (stream->mem, new_eof); + } + if (x == NULL) + HRETURN_ERROR (H5E_RESOURCE, H5E_NOSPACE, FAIL, + "unable to allocate memory block"); + stream->mem = x; + stream->eof = new_eof; + } + + /* Write from BUF to memory */ + memcpy (stream->mem + addr, buf, size); + stream->dirty = TRUE; + + FUNC_LEAVE (SUCCEED); +} + +#endif /* H5_HAVE_STREAM */ diff --git a/src/H5FDstream.h b/src/H5FDstream.h new file mode 100644 index 0000000..c8acd65 --- /dev/null +++ b/src/H5FDstream.h @@ -0,0 +1,53 @@ +/* + * Copyright © 2000 The author. + * The author prefers this code not be used for military purposes. + * + * + * Author: Thomas Radke <tradke@aei-potsdam.mpg.de> + * Tuesday, September 12, 2000 + * + * Purpose: The public header file for the Stream Virtual File Driver. + */ +#ifndef H5FDstream_H +#define H5FDstream_H + +#ifdef H5_HAVE_STREAM + +#include <H5Ipublic.h> + +#ifdef __cplusplus +extern "C" { +#endif + +#define H5FD_STREAM (H5FD_stream_init()) + +/* prototype for read broadcast callback routine */ +typedef int (*H5FD_stream_broadcast_t) (unsigned char **file, + haddr_t *len, + void *arg); + +/* Driver-specific file access properties */ +typedef struct H5FD_stream_fapl_t +{ + size_t increment; /* how much to grow memory in reallocs */ + int socket; /* external socket descriptor */ + hbool_t do_socket_io; /* do I/O on socket */ + unsigned int backlog; /* backlog argument for listen call */ + H5FD_stream_broadcast_t broadcast_fn; /* READ broadcast callback */ + void *broadcast_arg; /* READ broadcast callback user argument*/ +} H5FD_stream_fapl_t; + + +__DLL__ hid_t H5FD_stream_init (void); +__DLL__ herr_t H5Pset_fapl_stream (hid_t fapl_id, + H5FD_stream_fapl_t *fapl); +__DLL__ herr_t H5Pget_fapl_stream (hid_t fapl_id, + H5FD_stream_fapl_t *fapl /*out*/ ); + +#ifdef __cplusplus +} +#endif + +#endif /* H5_HAVE_STREAM */ + +#endif /* H5FDstream_H */ |