diff options
author | Thomas Radke <tradke@aei.mpg.de> | 2000-09-15 11:35:39 (GMT) |
---|---|---|
committer | Thomas Radke <tradke@aei.mpg.de> | 2000-09-15 11:35:39 (GMT) |
commit | 5a4c55b42c8123e69f85be7fa75740fa428b2deb (patch) | |
tree | 000b22e59f6c1677a2cccafcb924ee9d7387b0ed | |
parent | 55bd85f6e531e230958e635b7bebee08b2b3db93 (diff) | |
download | hdf5-5a4c55b42c8123e69f85be7fa75740fa428b2deb.zip hdf5-5a4c55b42c8123e69f85be7fa75740fa428b2deb.tar.gz hdf5-5a4c55b42c8123e69f85be7fa75740fa428b2deb.tar.bz2 |
[svn-r2555] Purpose:
Added source files for the Stream Virtual File Driver.
Description:
The Stream VFD allows users to stream complete HDF5 files
via socket connections between different applications.
Files which were created anew are flushed to any connected client
on each H5Fflush() or H5Fclose() operation.
Files which are opened as read-only will be read from a socket
on a H5Fopen() call.
The driver's H5FDset_fapl_stream() routine allows to pass in
several parameters such as an external socket descriptor,
some socket options, and flags for broadcasting a received file.
If an external socket is provided the Stream VFD would use that
for the socket calls. Otherwise it parses the filename argument
in H5Fcreate()/H5Fopen() for a 'hostname::port' parameter.
All files processed by the Stream VFD are kept in memory
(same way as the core VFD does).
Platforms:
Tested so far under Linux, Irix 32/64bit, OSF1, Solaris, Cray Unicos,
Hitachi SR8000, IBM AIX.
Not tested under Windows yet.
-rw-r--r-- | src/H5FDstream.c | 984 | ||||
-rw-r--r-- | src/H5FDstream.h | 53 |
2 files changed, 1037 insertions, 0 deletions
diff --git a/src/H5FDstream.c b/src/H5FDstream.c new file mode 100644 index 0000000..cd0e6e6 --- /dev/null +++ b/src/H5FDstream.c @@ -0,0 +1,984 @@ +/* + * Copyright © 2000 The author. + * The author prefers this code not be used for military purposes. + * + * + * Author: Thomas Radke <tradke@aei-potsdam.mpg.de> + * Tuesday, September 12, 2000 + * + * Purpose: This code provides the Stream Virtual File Driver. + * It is very much based on the core VFD which keeps an + * entire HDF5 data file to be processed in main memory. + * In addition to that, the memory image of the file is + * read from/written to a socket during an open/flush operation. + */ + +#include <netdb.h> /* gethostbyname */ +#include <sys/types.h> /* socket stuff */ +#include <sys/socket.h> /* socket stuff */ +#include <netinet/in.h> /* socket stuff */ +#include <netinet/tcp.h> /* socket stuff */ + +#include <H5Eprivate.h> /* error handling */ +#include <H5FDpublic.h> /* VFD structures */ +#include <H5MMprivate.h> /* memory allocation */ +#include <H5Ppublic.h> /* files */ +#include <H5FDstream.h> /* Stream VFD header */ + + +/* Only build this driver if it was configured with --with-Stream-VFD */ +#ifdef H5_HAVE_STREAM + +/* Some useful macros */ +#undef MAX +#define MAX(x,y) ((x) > (y) ? (x) : (y)) +#undef MIN +#define MIN(x,y) ((x) < (y) ? (x) : (y)) + +/* Uncomment this to switch on debugging output */ +/* #define DEBUG 1 */ + +/* The driver identification number, initialized at runtime */ +static hid_t H5FD_STREAM_g = 0; + +/* + * The description of a file belonging to this driver. The `eoa' and `eof' + * determine the amount of hdf5 address space in use and the high-water mark + * of the file (the current size of the underlying memory). + */ +typedef struct H5FD_stream_t +{ + H5FD_t pub; /* public stuff, must be first */ + H5FD_stream_fapl_t fapl; /* file access property list */ + unsigned char *mem; /* the underlying memory */ + haddr_t eoa; /* end of allocated region */ + haddr_t eof; /* current allocated size */ + int socket; /* socket to write / read from */ + hbool_t dirty; /* flag indicating unflushed data */ + hbool_t internal_socket; /* flag indicating an internal socket */ +} H5FD_stream_t; + +/* Allocate memory in multiples of this size (in bytes) by default */ +#define H5FD_STREAM_INCREMENT 8192 + +/* default backlog argument for listen call */ +#define H5FD_STREAM_BACKLOG 1 + +/* default file access property list */ +static const H5FD_stream_fapl_t default_fapl = +{ + H5FD_STREAM_INCREMENT, /* address space allocation blocksize */ + -1, /* no external socket descriptor */ + TRUE, /* enable I/O on socket */ + H5FD_STREAM_BACKLOG, /* default backlog for listen(2) */ + NULL, /* do not broadcast received files */ + NULL /* argument to READ broadcast routine */ +}; + +/* + * These macros check for overflow of various quantities. These macros + * assume that file_offset_t is signed and haddr_t and size_t are unsigned. + * + * ADDR_OVERFLOW: Checks whether a file address of type `haddr_t' + * is too large to be represented by the second argument + * of the file seek function. + * + * SIZE_OVERFLOW: Checks whether a buffer size of type `hsize_t' is too + * large to be represented by the `size_t' type. + * + * REGION_OVERFLOW: Checks whether an address and size pair describe data + * which can be addressed entirely in memory. + */ +#ifdef HAVE_LSEEK64 +# define file_offset_t off64_t +#else +# define file_offset_t off_t +#endif +#define MAXADDR (((haddr_t)1<<(8*sizeof(file_offset_t)-1))-1) +#define ADDR_OVERFLOW(A) (HADDR_UNDEF==(A) || \ + ((A) & ~(haddr_t)MAXADDR)) +#define SIZE_OVERFLOW(Z) ((Z) & ~(hsize_t)MAXADDR) +#define REGION_OVERFLOW(A,Z) (ADDR_OVERFLOW(A) || SIZE_OVERFLOW(Z) || \ + HADDR_UNDEF==(A)+(Z) || \ + (size_t)((A)+(Z))<(size_t)(A)) + +/* Function prototypes */ +static void *H5FD_stream_fapl_get (H5FD_t *_stream); +static H5FD_t *H5FD_stream_open (const char *name, unsigned flags, + hid_t fapl_id, haddr_t maxaddr); +static herr_t H5FD_stream_flush (H5FD_t *_stream); +static herr_t H5FD_stream_close (H5FD_t *_stream); +static haddr_t H5FD_stream_get_eoa (H5FD_t *_stream); +static herr_t H5FD_stream_set_eoa (H5FD_t *_stream, haddr_t addr); +static haddr_t H5FD_stream_get_eof (H5FD_t *_stream); +static herr_t H5FD_stream_read (H5FD_t *_stream, hid_t fapl_id, haddr_t addr, + hsize_t size, void *buf); +static herr_t H5FD_stream_write (H5FD_t *_stream, H5FD_mem_t type, + hid_t fapl_id, haddr_t addr, + hsize_t size, const void *buf); + +/* The Stream VFD's class information structure */ +static const H5FD_class_t H5FD_stream_g = +{ + "stream", /* name */ + MAXADDR, /* maxaddr */ + NULL, /* sb_size */ + NULL, /* sb_encode */ + NULL, /* sb_decode */ + sizeof (H5FD_stream_fapl_t), /* fapl_size */ + H5FD_stream_fapl_get, /* fapl_get */ + NULL, /* fapl_copy */ + NULL, /* fapl_free */ + 0, /* dxpl_size */ + NULL, /* dxpl_copy */ + NULL, /* dxpl_free */ + H5FD_stream_open, /* open */ + H5FD_stream_close, /* close */ + NULL, /* query */ + NULL, /* cmp */ + NULL, /* alloc */ + NULL, /* free */ + H5FD_stream_get_eoa, /* get_eoa */ + H5FD_stream_set_eoa, /* set_eoa */ + H5FD_stream_get_eof, /* get_eof */ + H5FD_stream_read, /* read */ + H5FD_stream_write, /* write */ + H5FD_stream_flush, /* flush */ + H5FD_FLMAP_SINGLE, /* fl_map */ +}; + +/* Interface initialization */ +#define INTERFACE_INIT H5FD_stream_init +static intn interface_initialize_g = 0; + + +/*------------------------------------------------------------------------- + * Function: H5FD_stream_init + * + * Purpose: Initialize this driver by registering it with the library. + * + * Return: Success: The driver ID for the Stream driver. + * Failure: Negative. + * + * Programmer: Thomas Radke + * Tuesday, September 12, 2000 + * + * Modifications: + * + *------------------------------------------------------------------------- + */ +hid_t H5FD_stream_init (void) +{ + FUNC_ENTER (H5FD_stream_init, FAIL); + + if (H5I_VFL != H5Iget_type (H5FD_STREAM_g)) + H5FD_STREAM_g = H5FDregister (&H5FD_stream_g); + + FUNC_LEAVE (H5FD_STREAM_g); +} + + +/*------------------------------------------------------------------------- + * Function: H5Pset_fapl_stream + * + * Purpose: Modify the file access property list to use the Stream + * driver defined in this source file. The INCREMENT specifies + * how much to grow the memory each time we need more. + * If a valid SOCKET argument is given this will be used + * by the driver instead of parsing the 'hostname:port' filename + * and opening a socket internally. + * + * Return: Success: Non-negative + * Failure: Negative + * + * Programmer: Thomas Radke + * Tuesday, September 12, 2000 + * + * Modifications: + * + *------------------------------------------------------------------------- + */ +herr_t H5Pset_fapl_stream (hid_t fapl_id, H5FD_stream_fapl_t *fapl) +{ + herr_t result; + H5FD_stream_fapl_t user_fapl; + + + FUNC_ENTER (H5FD_set_fapl_stream, FAIL); + H5TRACE2 ("e", "ix", fapl_id, fapl); + + if (H5P_FILE_ACCESS != H5Pget_class (fapl_id)) + HRETURN_ERROR (H5E_PLIST, H5E_BADTYPE, FAIL, "not a fapl"); + + if (fapl) + { + if (! fapl->do_socket_io && fapl->broadcast_fn == NULL) + HRETURN_ERROR (H5E_ARGS, H5E_BADVALUE, FAIL, + "read broadcast function pointer is NULL"); + + user_fapl = *fapl; + if (fapl->increment == 0) + { + user_fapl.increment = H5FD_STREAM_INCREMENT; + } + result = H5Pset_driver (fapl_id, H5FD_STREAM, &user_fapl); + } + else + { + result = H5Pset_driver (fapl_id, H5FD_STREAM, &default_fapl); + } + + FUNC_LEAVE (result); +} + + +/*------------------------------------------------------------------------- + * Function: H5Pget_fapl_stream + * + * Purpose: Queries properties set by the H5Pset_fapl_stream() function. + * + * Return: Success: Non-negative + * Failure: Negative + * + * Programmer: Thomas Radke + * Tuesday, September 12, 2000 + * + * Modifications: + * + *------------------------------------------------------------------------- + */ +herr_t H5Pget_fapl_stream(hid_t fapl_id, H5FD_stream_fapl_t *fapl /* out */) +{ + H5FD_stream_fapl_t *this_fapl; + + + FUNC_ENTER (H5Pget_fapl_stream, FAIL); + H5TRACE2("e","ix",fapl_id,fapl); + + if (H5P_FILE_ACCESS != H5Pget_class (fapl_id)) + HRETURN_ERROR (H5E_PLIST, H5E_BADTYPE, FAIL, "not a fapl"); + if (H5FD_STREAM != H5Pget_driver (fapl_id)) + HRETURN_ERROR (H5E_PLIST, H5E_BADVALUE, FAIL, "incorrect VFL driver"); + if (NULL == (this_fapl = H5Pget_driver_info (fapl_id))) + HRETURN_ERROR (H5E_PLIST, H5E_BADVALUE, FAIL, "bad VFL driver info"); + + if (fapl) + *fapl = *this_fapl; + + FUNC_LEAVE (SUCCEED); +} + + +/*------------------------------------------------------------------------- + * Function: H5FD_stream_fapl_get + * + * Purpose: Returns a copy of the file access properties + * + * Return: Success: Ptr to new file access properties + * Failure: NULL + * + * Programmer: Thomas Radke + * Tuesday, September 12, 2000 + * + * Modifications: + * + *------------------------------------------------------------------------- + */ +static void *H5FD_stream_fapl_get (H5FD_t *_stream) +{ + H5FD_stream_t *stream = (H5FD_stream_t *) _stream; + H5FD_stream_fapl_t *fapl; + + + FUNC_ENTER (H5FD_stream_fapl_get, NULL); + + if ((fapl = H5MM_calloc (sizeof (H5FD_stream_fapl_t))) == NULL) + HRETURN_ERROR (H5E_RESOURCE, H5E_NOSPACE, NULL, "memory allocation failed"); + + *fapl = stream->fapl; + + FUNC_LEAVE (fapl); +} + + +static int H5FDstream_open_socket(const char *filename, int o_flags, + unsigned int backlog, + const char **errormsg, + H5E_major_t *major, H5E_minor_t *minor) +{ + struct sockaddr_in sin; + struct hostent *he; + unsigned short int port; + int sock; + char *hostname; + const char *separator, *tmp; + const int on = 1; + + + sock = -1; + *errormsg = NULL; + + /* Parse "hostname:port" from filename argument */ + for (separator = filename; *separator != ':' && *separator; separator++); + if (separator == filename || !*separator) + { + *errormsg = "invalid host address"; + } + else + { + tmp = separator; + if (! tmp [1]) + { + *errormsg = "no port number"; + } + while (*errormsg == NULL && *++tmp) + { + if (! isdigit (*tmp)) + { + *errormsg = "invalid port number"; + } + } + } + + /* Return if parsing the filename failed */ + if (*errormsg) + { + *major = H5E_ARGS; *minor = H5E_BADVALUE; + return (sock); + } + + hostname = (char *) H5MM_malloc (separator - filename + 1); + + /* Return if out of memory */ + if (hostname == NULL) + { + *major = H5E_RESOURCE; *minor = H5E_NOSPACE; + *errormsg = "memory allocation failed"; + return (sock); + } + + strncpy (hostname, filename, separator - filename); + hostname [separator - filename] = 0; + port = atoi (separator + 1); + + memset (&sin, 0, sizeof (sin)); + sin.sin_family = AF_INET; + sin.sin_port = htons (port); + + if (! (he = gethostbyname (hostname))) + *errormsg = "unable to get host address"; + else if ((sock = socket (AF_INET, SOCK_STREAM, 0)) < 0) + *errormsg = "unable to open socket"; + + if (*errormsg == NULL) + { + if (O_RDONLY == o_flags) + { + memcpy (&sin.sin_addr, he->h_addr, he->h_length); +#ifdef DEBUG + fprintf (stderr, "Stream VFD: connecting to host '%s' port %d\n", + hostname, port); +#endif + if (connect (sock, (struct sockaddr *) &sin, sizeof (sin)) < 0) + *errormsg = "unable to connect"; + } + else + { + sin.sin_addr.s_addr = INADDR_ANY; + if (fcntl (sock, F_SETFL, O_NONBLOCK) < 0) + *errormsg = "unable to set non-blocking mode for socket"; + else if (setsockopt (sock, IPPROTO_TCP, TCP_NODELAY, &on, sizeof(on)) < 0) + *errormsg = "unable to set socket option TCP_NODELAY"; + else if (setsockopt (sock, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)) < 0) + *errormsg = "unable to set socket option SO_REUSEADDR"; + else if (bind (sock, (struct sockaddr *) &sin, sizeof (sin)) < 0) + *errormsg = "unable to bind socket"; + else if (listen (sock, backlog) < 0) + *errormsg = "unable to listen on socket"; + } + } + + H5MM_xfree (hostname); + + /* Return if opening the socket failed */ + if (*errormsg) + { + if (sock >= 0) + { + close (sock); + sock = -1; + } + *major = H5E_FILE; *minor = H5E_CANTOPENFILE; + } + + return (sock); +} + + +static void H5FDstream_read_from_socket (H5FD_stream_t *stream, + const char **errormsg, + H5E_major_t *major, H5E_minor_t *minor) +{ + ssize_t size; + size_t max_size = 0; + unsigned char *ptr; + + + *errormsg = NULL; + stream->eof = 0; + stream->mem = NULL; + + while (1) + { + if (max_size <= 0) + { + /* + * Allocate initial buffer as increment + 1 + * to prevent unnecessary reallocation + * if increment is exactly a multiple of the filesize + */ + max_size = stream->fapl.increment; + if (! stream->mem) + max_size++; + ptr = H5MM_realloc (stream->mem, stream->eof + max_size); + if (! ptr) + { + *major = H5E_RESOURCE; *minor = H5E_NOSPACE; + *errormsg = "unable to allocate file space buffer"; + return; + } + stream->mem = ptr; + ptr += stream->eof; + } + + /* now receive the next chunk of data */ + size = read (stream->socket, ptr, max_size); + if (size < 0 && (EINTR == errno || EAGAIN == errno)) + continue; + if (size < 0) + { + *major = H5E_IO; *minor = H5E_READERROR; + *errormsg = "error reading from file from socket"; + return; + } + if (! size) + break; + max_size -= (size_t) size; + stream->eof += (haddr_t) size; + ptr += size; +#ifdef DEBUG + fprintf (stderr, "Stream VFD: read %d bytes (%d total) from socket\n", + (int) size, (int) stream->eof); +#endif + } + +#ifdef DEBUG + fprintf (stderr, "Stream VFD: read total of %d bytes from socket\n", + (int) stream->eof); +#endif +} + + +/*------------------------------------------------------------------------- + * Function: H5FD_stream_open + * + * Purpose: Opens an HDF5 file in memory. + * + * Return: Success: A pointer to a new file data structure. The + * public fields will be initialized by the + * caller, which is always H5FD_open(). + * Failure: NULL + * + * Programmer: Thomas Radke + * Tuesday, September 12, 2000 + * + * Modifications: + * + *------------------------------------------------------------------------- + */ +static H5FD_t *H5FD_stream_open (const char *filename, + unsigned flags, + hid_t fapl_id, + haddr_t maxaddr) +{ + H5FD_stream_t _stream, *stream; + const H5FD_stream_fapl_t *fapl; + int o_flags; + H5E_major_t major; + H5E_minor_t minor; + const char *errormsg; + + + FUNC_ENTER (H5FD_stream_open, NULL); + + /* Check arguments */ + if (filename == NULL|| *filename == '\0') + HRETURN_ERROR (H5E_ARGS, H5E_BADVALUE, NULL,"invalid file name"); + if (maxaddr == 0 || HADDR_UNDEF == maxaddr) + HRETURN_ERROR (H5E_ARGS, H5E_BADRANGE, NULL, "bogus maxaddr"); + if (ADDR_OVERFLOW (maxaddr)) + HRETURN_ERROR (H5E_ARGS, H5E_OVERFLOW, NULL, "maxaddr overflow"); + + /* Build the open flags */ + o_flags = (H5F_ACC_RDWR & flags) ? O_RDWR : O_RDONLY; + if (H5F_ACC_TRUNC & flags) o_flags |= O_TRUNC; + if (H5F_ACC_CREAT & flags) o_flags |= O_CREAT; + if (H5F_ACC_EXCL & flags) o_flags |= O_EXCL; + + if ((O_RDWR & o_flags) && ! (O_CREAT & o_flags)) + HRETURN_ERROR (H5E_ARGS, H5E_UNSUPPORTED, NULL, + "open stream for read/write not supported"); + + if (H5P_DEFAULT != fapl_id) + fapl = H5Pget_driver_info (fapl_id); + if (fapl == NULL) + fapl = &default_fapl; + + /* zero out file structure and set file access property list */ + memset (&_stream, 0, sizeof (_stream)); + _stream.fapl = *fapl; + + errormsg = NULL; + + /* if an external socket is provided with the file access property list + we use that, otherwise the filename argument is parsed and a socket + is opened internally */ + if (fapl->do_socket_io) + { + if (fapl->socket >= 0) + { + _stream.internal_socket = FALSE; + _stream.socket = fapl->socket; + } + else + { + _stream.internal_socket = TRUE; + _stream.socket = H5FDstream_open_socket (filename, o_flags, fapl->backlog, + &errormsg, &major, &minor); + } + } + else + { + _stream.socket = -1; + } + + /* read the data from socket into memory */ + if (O_RDONLY == o_flags) + { + if (errormsg == NULL && fapl->do_socket_io) + { +#ifdef DEBUG + fprintf (stderr, "Stream VFD: reading file from socket\n"); +#endif + H5FDstream_read_from_socket (&_stream, &errormsg, &major, &minor); + } + + /* Now call the user's broadcast routine if given */ + if (fapl->broadcast_fn) + { + if ((fapl->broadcast_fn) (&_stream.mem, &_stream.eof, + fapl->broadcast_arg) < 0) + { + /* don't override previous error codes */ + if (errormsg == NULL) + { + major = H5E_IO; minor = H5E_READERROR; + errormsg = "broadcast error"; + } + } + + /* check for filesize of zero bytes */ + if (errormsg == NULL && _stream.eof == 0) + { + major = H5E_IO; minor = H5E_READERROR; + errormsg = "zero filesize"; + } + } + + /* For files which are read from a socket: + the opened socket is not needed anymore */ + if (errormsg == NULL) + { + if (_stream.internal_socket && _stream.socket >= 0) + close (_stream.socket); + _stream.socket = -1; + } + } + + /* Create the new file struct */ + stream = NULL; + if (errormsg == NULL) + { + stream = (H5FD_stream_t *) H5MM_calloc (sizeof (H5FD_stream_t)); + if (stream == NULL) + { + major = H5E_RESOURCE; minor = H5E_NOSPACE; + errormsg = "unable to allocate file struct"; + } + else + { + *stream = _stream; + } + } + + if (errormsg) + { + if (_stream.mem) + H5MM_xfree (_stream.mem); + if (_stream.internal_socket && _stream.socket >= 0) + close (_stream.socket); + HRETURN_ERROR (major, minor, NULL, errormsg); + } + + FUNC_LEAVE ((H5FD_t *) stream); +} + + +/*------------------------------------------------------------------------- + * Function: H5FD_stream_flush + * + * Purpose: Flushes the file via sockets to any connected clients + * if its dirty flag is set. + * + * Return: Success: 0 + * Failure: -1 + * + * Programmer: Thomas Radke + * Tuesday, September 12, 2000 + * + * Modifications: + * + *------------------------------------------------------------------------- + */ +static herr_t H5FD_stream_flush (H5FD_t *_stream) +{ + H5FD_stream_t *stream = (H5FD_stream_t *) _stream; + haddr_t size; + unsigned char *ptr; + struct sockaddr from; +#if !(defined(linux) || defined(sun)) + typedef int socklen_t; +#endif + socklen_t fromlen; + int sock; + + + FUNC_ENTER (H5FD_stream_flush, FAIL); + + /* Write to backing store */ + if (stream->dirty && stream->socket >= 0) + { +#ifdef DEBUG + fprintf (stderr, "Stream VFD: accepting client connections\n"); +#endif + fromlen = sizeof (from); + while ((sock = accept (stream->socket, &from, &fromlen)) >= 0) + { + if (fcntl (sock, F_SETFL, O_NONBLOCK) < 0) + { + close (sock); + continue; /* continue the loop for other clients' requests */ + } + + size = stream->eof; + ptr = stream->mem; + + while (size) + { + ssize_t n = write (sock, ptr, size); + if (n < 0 && (EINTR == errno || EAGAIN == errno)) + continue; + if (n < 0) + { + close (sock); + /* FIXME: continue the loop for other clients here */ + HRETURN_ERROR (H5E_IO, H5E_WRITEERROR, FAIL, + "error writing to socket"); + } + ptr += (size_t) n; + size -= (size_t) n; +#ifdef DEBUG + fprintf (stderr, "Stream VFD: wrote %d bytes to socket, %d in total, " + "%d left\n", (int) n, ptr - stream->mem, (int) size); +#endif + } + close (sock); + } + stream->dirty = FALSE; + } + + FUNC_LEAVE (SUCCEED); +} + + +/*------------------------------------------------------------------------- + * Function: H5FD_stream_close + * + * Purpose: Closes the file. + * + * Return: Success: 0 + * Failure: -1 + * + * Programmer: Thomas Radke + * Tuesday, September 12, 2000 + * + * Modifications: + * + *------------------------------------------------------------------------- + */ +static herr_t H5FD_stream_close (H5FD_t *_stream) +{ + H5FD_stream_t *stream = (H5FD_stream_t *) _stream; + + + FUNC_ENTER (H5FD_stream_close, FAIL); + + /* Flush */ + if (H5FD_stream_flush (_stream) < 0) + HRETURN_ERROR (H5E_FILE, H5E_CANTFLUSH, FAIL, "unable to flush file"); + + /* Release resources */ + if (stream->socket >= 0 && stream->internal_socket) + { + close (stream->socket); + } + if (stream->mem) + { + H5MM_xfree (stream->mem); + } + memset (stream, 0, sizeof (H5FD_stream_t)); + + FUNC_LEAVE (0); +} + + +/*------------------------------------------------------------------------- + * Function: H5FD_stream_get_eoa + * + * Purpose: Gets the end-of-address marker for the file. The EOA marker + * is the first address past the last byte allocated in the + * format address space. + * + * Return: Success: The end-of-address marker. + * Failure: HADDR_UNDEF + * + * Programmer: Thomas Radke + * Tuesday, September 12, 2000 + * + * Modifications: + * + *------------------------------------------------------------------------- + */ +static haddr_t H5FD_stream_get_eoa (H5FD_t *_stream) +{ + H5FD_stream_t *stream = (H5FD_stream_t *) _stream; + + + FUNC_ENTER (H5FD_stream_get_eoa, HADDR_UNDEF); + + FUNC_LEAVE (stream->eoa); +} + + +/*------------------------------------------------------------------------- + * Function: H5FD_stream_set_eoa + * + * Purpose: Set the end-of-address marker for the file. This function is + * called shortly after an existing HDF5 file is opened in order + * to tell the driver where the end of the HDF5 data is located. + * + * Return: Success: 0 + * Failure: -1 + * + * Programmer: Thomas Radke + * Tuesday, September 12, 2000 + * + * Modifications: + * + *------------------------------------------------------------------------- + */ +static herr_t H5FD_stream_set_eoa (H5FD_t *_stream, + haddr_t addr) +{ + H5FD_stream_t *stream = (H5FD_stream_t *) _stream; + + + FUNC_ENTER (H5FD_stream_set_eoa, FAIL); + + if (ADDR_OVERFLOW (addr)) + HRETURN_ERROR (H5E_ARGS, H5E_OVERFLOW, FAIL, "address overflow"); + + stream->eoa = addr; + + FUNC_LEAVE (0); +} + + +/*------------------------------------------------------------------------- + * Function: H5FD_stream_get_eof + * + * Purpose: Returns the end-of-file marker, which is the greater of + * either the size of the underlying memory or the HDF5 + * end-of-address markers. + * + * Return: Success: End of file address, the first address past + * the end of the "file", either the memory + * or the HDF5 file. + * Failure: HADDR_UNDEF + * + * Programmer: Thomas Radke + * Tuesday, September 12, 2000 + * + * Modifications: + * + *------------------------------------------------------------------------- + */ +static haddr_t H5FD_stream_get_eof (H5FD_t *_stream) +{ + H5FD_stream_t *stream = (H5FD_stream_t *) _stream; + + + FUNC_ENTER (H5FD_stream_get_eof, HADDR_UNDEF); + + FUNC_LEAVE (MAX (stream->eof, stream->eoa)); +} + + +/*------------------------------------------------------------------------- + * Function: H5FD_stream_read + * + * Purpose: Reads SIZE bytes of data from FILE beginning at address ADDR + * into buffer BUF according to data transfer properties in + * DXPL_ID. + * + * Return: Success: 0 + * Result is stored in caller-supplied buffer BUF + * Failure: -1 + * Contents of buffer BUF are undefined + * + * Programmer: Thomas Radke + * Tuesday, September 12, 2000 + * + * Modifications: + * + *------------------------------------------------------------------------- + */ +static herr_t H5FD_stream_read (H5FD_t *_stream, + hid_t UNUSED dxpl_id, + haddr_t addr, + hsize_t size, + void *buf /*out*/) +{ + H5FD_stream_t *stream = (H5FD_stream_t *) _stream; + ssize_t nbytes; + + + FUNC_ENTER (H5FD_stream_read, FAIL); + + assert (stream && stream->pub.cls); + assert (buf); + + /* Check for overflow conditions */ + if (HADDR_UNDEF == addr) + HRETURN_ERROR (H5E_IO, H5E_OVERFLOW, FAIL, "file address overflowed"); + if (REGION_OVERFLOW (addr, size)) + HRETURN_ERROR (H5E_IO, H5E_OVERFLOW, FAIL, "file address overflowed"); + if (addr + size > stream->eoa) + HRETURN_ERROR (H5E_IO, H5E_OVERFLOW, FAIL, "file address overflowed"); + + /* Read the part which is before the EOF marker */ + if (addr < stream->eof) + { + nbytes = MIN (size, stream->eof - addr); + memcpy (buf, stream->mem + addr, nbytes); + size -= nbytes; + addr += nbytes; + buf = (char *) buf + nbytes; + } + + /* Read zeros for the part which is after the EOF markers */ + if (size > 0) + memset (buf, 0, size); + + FUNC_LEAVE (SUCCEED); +} + + +/*------------------------------------------------------------------------- + * Function: H5FD_stream_write + * + * Purpose: Writes SIZE bytes of data to FILE beginning at address ADDR + * from buffer BUF according to data transfer properties in + * DXPL_ID. + * + * Return: Success: Zero + * Failure: -1 + * + * Programmer: Thomas Radke + * Tuesday, September 12, 2000 + * + * Modifications: + * + *------------------------------------------------------------------------- + */ +static herr_t H5FD_stream_write (H5FD_t *_stream, + H5FD_mem_t UNUSED type, + hid_t UNUSED dxpl_id, + haddr_t addr, + hsize_t size, + const void *buf) +{ + H5FD_stream_t *stream = (H5FD_stream_t *) _stream; + + + FUNC_ENTER (H5FD_stream_write, FAIL); + + assert (stream && stream->pub.cls); + assert (buf); + + /* Check for overflow conditions */ + if (REGION_OVERFLOW (addr, size)) + HRETURN_ERROR (H5E_IO, H5E_OVERFLOW, FAIL, "file address overflowed"); + if (addr + size > stream->eoa) + HRETURN_ERROR (H5E_IO, H5E_OVERFLOW, FAIL, "file address overflowed"); + + /* + * Allocate more memory if necessary, careful of overflow. Also, if the + * allocation fails then the file should remain in a usable state. Be + * careful of non-Posix realloc() that doesn't understand what to do when + * the first argument is null. + */ + if (addr + size > stream->eof) + { + unsigned char *x; + haddr_t new_eof = stream->fapl.increment * + ((addr+size) / stream->fapl.increment); + + + if ((addr+size) % stream->fapl.increment) + { + new_eof += stream->fapl.increment; + } + if (stream->mem == NULL) + { + x = H5MM_malloc (new_eof); + } + else + { + x = H5MM_realloc (stream->mem, new_eof); + } + if (x == NULL) + HRETURN_ERROR (H5E_RESOURCE, H5E_NOSPACE, FAIL, + "unable to allocate memory block"); + stream->mem = x; + stream->eof = new_eof; + } + + /* Write from BUF to memory */ + memcpy (stream->mem + addr, buf, size); + stream->dirty = TRUE; + + FUNC_LEAVE (SUCCEED); +} + +#endif /* H5_HAVE_STREAM */ diff --git a/src/H5FDstream.h b/src/H5FDstream.h new file mode 100644 index 0000000..c8acd65 --- /dev/null +++ b/src/H5FDstream.h @@ -0,0 +1,53 @@ +/* + * Copyright © 2000 The author. + * The author prefers this code not be used for military purposes. + * + * + * Author: Thomas Radke <tradke@aei-potsdam.mpg.de> + * Tuesday, September 12, 2000 + * + * Purpose: The public header file for the Stream Virtual File Driver. + */ +#ifndef H5FDstream_H +#define H5FDstream_H + +#ifdef H5_HAVE_STREAM + +#include <H5Ipublic.h> + +#ifdef __cplusplus +extern "C" { +#endif + +#define H5FD_STREAM (H5FD_stream_init()) + +/* prototype for read broadcast callback routine */ +typedef int (*H5FD_stream_broadcast_t) (unsigned char **file, + haddr_t *len, + void *arg); + +/* Driver-specific file access properties */ +typedef struct H5FD_stream_fapl_t +{ + size_t increment; /* how much to grow memory in reallocs */ + int socket; /* external socket descriptor */ + hbool_t do_socket_io; /* do I/O on socket */ + unsigned int backlog; /* backlog argument for listen call */ + H5FD_stream_broadcast_t broadcast_fn; /* READ broadcast callback */ + void *broadcast_arg; /* READ broadcast callback user argument*/ +} H5FD_stream_fapl_t; + + +__DLL__ hid_t H5FD_stream_init (void); +__DLL__ herr_t H5Pset_fapl_stream (hid_t fapl_id, + H5FD_stream_fapl_t *fapl); +__DLL__ herr_t H5Pget_fapl_stream (hid_t fapl_id, + H5FD_stream_fapl_t *fapl /*out*/ ); + +#ifdef __cplusplus +} +#endif + +#endif /* H5_HAVE_STREAM */ + +#endif /* H5FDstream_H */ |