diff options
Diffstat (limited to 'src/H5FDstream.c')
-rw-r--r-- | src/H5FDstream.c | 266 |
1 files changed, 73 insertions, 193 deletions
diff --git a/src/H5FDstream.c b/src/H5FDstream.c index 72cb7b8..60907f6 100644 --- a/src/H5FDstream.c +++ b/src/H5FDstream.c @@ -221,18 +221,15 @@ hid_t H5FD_stream_init (void) { FUNC_ENTER_NOAPI(H5FD_stream_init, FAIL); - if (H5I_VFL != H5Iget_type (H5FD_STREAM_g)) - { + if (H5I_VFL != H5Iget_type (H5FD_STREAM_g)) { H5FD_STREAM_g = H5FDregister (&H5FD_stream_g); /* set the process signal mask to ignore SIGPIPE signals */ /* NOTE: Windows doesn't know SIGPIPE signals that's why the #ifdef */ #ifdef SIGPIPE if (signal (SIGPIPE, SIG_IGN) == SIG_ERR) - { fprintf (stderr, "Stream VFD warning: failed to set the process signal " "mask to ignore SIGPIPE signals\n"); - } #endif } @@ -264,36 +261,29 @@ herr_t H5Pset_fapl_stream (hid_t fapl_id, H5FD_stream_fapl_t *fapl) { H5FD_stream_fapl_t user_fapl; H5P_genplist_t *plist; /* Property list pointer */ - herr_t result; + herr_t ret_value=SUCCEED; /* Return value */ FUNC_ENTER_API(H5Pset_fapl_stream, FAIL); H5TRACE2 ("e", "ix", fapl_id, fapl); if(TRUE!=H5P_isa_class(fapl_id,H5P_FILE_ACCESS) || NULL == (plist = H5I_object(fapl_id))) - HRETURN_ERROR (H5E_PLIST, H5E_BADTYPE, FAIL, "not a fapl"); + HGOTO_ERROR (H5E_PLIST, H5E_BADTYPE, FAIL, "not a fapl"); - if (fapl) - { + if (fapl) { if (! fapl->do_socket_io && fapl->broadcast_fn == NULL) - { - HRETURN_ERROR (H5E_ARGS, H5E_BADVALUE, FAIL, - "read broadcast function pointer is NULL"); - } + HGOTO_ERROR (H5E_ARGS, H5E_BADVALUE, FAIL, "read broadcast function pointer is NULL"); user_fapl = *fapl; if (fapl->increment == 0) - { user_fapl.increment = H5FD_STREAM_INCREMENT; - } user_fapl.port = 0; - result = H5P_set_driver (plist, H5FD_STREAM, &user_fapl); + ret_value = H5P_set_driver (plist, H5FD_STREAM, &user_fapl); } else - { - result = H5P_set_driver (plist, H5FD_STREAM, &default_fapl); - } + ret_value = H5P_set_driver (plist, H5FD_STREAM, &default_fapl); - FUNC_LEAVE (result); +done: + FUNC_LEAVE (ret_value); } @@ -316,29 +306,23 @@ herr_t H5Pget_fapl_stream(hid_t fapl_id, H5FD_stream_fapl_t *fapl /* out */) { H5FD_stream_fapl_t *this_fapl; H5P_genplist_t *plist; /* Property list pointer */ + herr_t ret_value=SUCCEED; /* Return value */ FUNC_ENTER_API(H5Pget_fapl_stream, FAIL); H5TRACE2("e","ix",fapl_id,fapl); if(TRUE!=H5P_isa_class(fapl_id,H5P_FILE_ACCESS) || NULL == (plist = H5I_object(fapl_id))) - { - HRETURN_ERROR (H5E_PLIST, H5E_BADTYPE, FAIL, "not a fapl"); - } + HGOTO_ERROR (H5E_PLIST, H5E_BADTYPE, FAIL, "not a fapl"); if (H5FD_STREAM != H5P_get_driver (plist)) - { - HRETURN_ERROR (H5E_PLIST, H5E_BADVALUE, FAIL, "incorrect VFL driver"); - } + HGOTO_ERROR (H5E_PLIST, H5E_BADVALUE, FAIL, "incorrect VFL driver"); if (NULL == (this_fapl = H5P_get_driver_info (plist))) - { - HRETURN_ERROR (H5E_PLIST, H5E_BADVALUE, FAIL, "bad VFL driver info"); - } + HGOTO_ERROR (H5E_PLIST, H5E_BADVALUE, FAIL, "bad VFL driver info"); if (fapl) - { *fapl = *this_fapl; - } - FUNC_LEAVE (SUCCEED); +done: + FUNC_LEAVE (ret_value); } @@ -362,18 +346,20 @@ H5FD_stream_fapl_get (H5FD_t *_stream) { H5FD_stream_t *stream = (H5FD_stream_t *) _stream; H5FD_stream_fapl_t *fapl; - + H5FD_t *ret_value; FUNC_ENTER_NOAPI(H5FD_stream_fapl_get, NULL); if ((fapl = H5MM_calloc (sizeof (H5FD_stream_fapl_t))) == NULL) - { - HRETURN_ERROR (H5E_RESOURCE, H5E_NOSPACE, NULL, "memory allocation failed"); - } + HGOTO_ERROR (H5E_RESOURCE, H5E_NOSPACE, NULL, "memory allocation failed"); *fapl = stream->fapl; - FUNC_LEAVE (fapl); + /* Set return value */ + ret_value=fapl; + +done: + FUNC_LEAVE (ret_value); } @@ -396,22 +382,14 @@ H5FD_stream_open_socket (const char *filename, int o_flags, for (separator = filename; *separator != ':' && *separator; separator++) ; if (separator == filename || !*separator) - { HGOTO_ERROR(H5E_ARGS,H5E_BADVALUE,H5FD_STREAM_INVALID_SOCKET,"invalid host address"); - } - else - { + else { tmp = separator; if (! tmp[1]) - { HGOTO_ERROR(H5E_ARGS,H5E_BADVALUE,H5FD_STREAM_INVALID_SOCKET,"no port number"); - } - while (*++tmp) - { + while (*++tmp) { if (! isdigit (*tmp)) - { HGOTO_ERROR(H5E_ARGS,H5E_BADVALUE,H5FD_STREAM_INVALID_SOCKET,"invalid port number"); - } } } @@ -419,9 +397,7 @@ H5FD_stream_open_socket (const char *filename, int o_flags, /* Return if out of memory */ if (hostname == NULL) - { HGOTO_ERROR(H5E_RESOURCE,H5E_NOSPACE,H5FD_STREAM_INVALID_SOCKET,"memory allocation failed"); - } HDstrncpy (hostname, filename, (size_t)(separator - filename)); hostname[separator - filename] = 0; @@ -432,72 +408,49 @@ H5FD_stream_open_socket (const char *filename, int o_flags, server.sin_port = htons (fapl->port); if (! (he = gethostbyname (hostname))) - { HGOTO_ERROR(H5E_RESOURCE,H5E_NOSPACE,H5FD_STREAM_INVALID_SOCKET,"unable to get host address"); - } else if (H5FD_STREAM_ERROR_CHECK (sock = socket (AF_INET, SOCK_STREAM, 0))) - { HGOTO_ERROR(H5E_RESOURCE,H5E_NOSPACE,H5FD_STREAM_INVALID_SOCKET,"unable to open socket"); - } - if (O_RDONLY == o_flags) - { + if (O_RDONLY == o_flags) { HDmemcpy (&server.sin_addr, he->h_addr, (size_t)he->h_length); #ifdef DEBUG fprintf (stderr, "Stream VFD: connecting to host '%s' port %d\n", hostname, fapl->port); #endif if (connect (sock, (struct sockaddr *) &server, sizeof (server)) < 0) - { HGOTO_ERROR(H5E_RESOURCE,H5E_NOSPACE,H5FD_STREAM_INVALID_SOCKET,"unable to connect"); - } } - else - { + else { server.sin_addr.s_addr = INADDR_ANY; if (H5FD_STREAM_IOCTL_SOCKET (sock, FIONBIO, &on) < 0) - { HGOTO_ERROR(H5E_RESOURCE,H5E_NOSPACE,H5FD_STREAM_INVALID_SOCKET,"unable to set non-blocking mode for socket"); - } else if (setsockopt (sock, IPPROTO_TCP, TCP_NODELAY, (const char *) &on, sizeof(on)) < 0) - { HGOTO_ERROR(H5E_RESOURCE,H5E_NOSPACE,H5FD_STREAM_INVALID_SOCKET,"unable to set socket option TCP_NODELAY"); - } else if (setsockopt (sock, SOL_SOCKET, SO_REUSEADDR, (const char *) &on, sizeof(on)) < 0) - { HGOTO_ERROR(H5E_RESOURCE,H5E_NOSPACE,H5FD_STREAM_INVALID_SOCKET,"unable to set socket option SO_REUSEADDR"); - } - else - { + else { /* Try to bind the socket to the given port. If maxhunt is given try some successive ports also. */ first_port = fapl->port; - while (fapl->port <= first_port + fapl->maxhunt) - { + while (fapl->port <= first_port + fapl->maxhunt) { #ifdef DEBUG fprintf (stderr, "Stream VFD: binding to port %d\n", fapl->port); #endif server.sin_port = htons (fapl->port); if (bind (sock, (struct sockaddr *) &server, sizeof (server)) < 0) - { fapl->port++; - } else - { break; - } } - if (fapl->port > first_port + fapl->maxhunt) - { + if (fapl->port > first_port + fapl->maxhunt) { fapl->port = 0; HGOTO_ERROR(H5E_RESOURCE,H5E_NOSPACE,H5FD_STREAM_INVALID_SOCKET,"unable to bind socket"); } else if (listen (sock, fapl->backlog) < 0) - { HGOTO_ERROR(H5E_RESOURCE,H5E_NOSPACE,H5FD_STREAM_INVALID_SOCKET,"unable to listen on socket"); - } } } @@ -514,6 +467,7 @@ done: if (!H5FD_STREAM_ERROR_CHECK(sock)) H5FD_STREAM_CLOSE_SOCKET(sock); } /* end if */ + FUNC_LEAVE(ret_value); } @@ -531,10 +485,8 @@ H5FD_stream_read_from_socket (H5FD_stream_t *stream) stream->eof = 0; stream->mem = NULL; - while (1) - { - if (max_size <= 0) - { + while (1) { + if (max_size <= 0) { /* * Allocate initial buffer as increment + 1 * to prevent unnecessary reallocation @@ -542,14 +494,10 @@ H5FD_stream_read_from_socket (H5FD_stream_t *stream) */ max_size = stream->fapl.increment; if (! stream->mem) - { max_size++; - } ptr = H5MM_realloc (stream->mem, (size_t) (stream->eof + max_size)); if (! ptr) - { - HRETURN_ERROR(H5E_RESOURCE,H5E_NOSPACE,FAIL,"unable to allocate file space buffer"); - } + HGOTO_ERROR(H5E_RESOURCE,H5E_NOSPACE,FAIL,"unable to allocate file space buffer"); stream->mem = ptr; ptr += stream->eof; } @@ -558,17 +506,11 @@ H5FD_stream_read_from_socket (H5FD_stream_t *stream) size = recv (stream->socket, ptr, max_size, 0); if (size < 0 && (EINTR == errno || EAGAIN == errno || EWOULDBLOCK)) - { continue; - } if (size < 0) - { - HRETURN_ERROR(H5E_IO,H5E_READERROR,FAIL,"error reading from file from socket"); - } + HGOTO_ERROR(H5E_IO,H5E_READERROR,FAIL,"error reading from file from socket"); if (! size) - { break; - } max_size -= (size_t) size; stream->eof += (haddr_t) size; ptr += size; @@ -582,6 +524,7 @@ H5FD_stream_read_from_socket (H5FD_stream_t *stream) fprintf (stderr, "Stream VFD: read total of %d bytes from socket\n", (int) stream->eof); #endif +done: FUNC_LEAVE(ret_value); } @@ -616,23 +559,17 @@ H5FD_stream_open (const char *filename, WSADATA wsadata; #endif H5P_genplist_t *plist; /* Property list pointer */ - H5FD_t *ret_value=NULL; /* Function return value */ + H5FD_t *ret_value; /* Function return value */ FUNC_ENTER_NOAPI(H5FD_stream_open, NULL); /* Check arguments */ if (filename == NULL|| *filename == '\0') - { HGOTO_ERROR (H5E_ARGS, H5E_BADVALUE, NULL,"invalid file name"); - } if (maxaddr == 0 || HADDR_UNDEF == maxaddr) - { HGOTO_ERROR (H5E_ARGS, H5E_BADRANGE, NULL, "bogus maxaddr"); - } if (ADDR_OVERFLOW (maxaddr)) - { HGOTO_ERROR (H5E_ARGS, H5E_OVERFLOW, NULL, "maxaddr overflow"); - } /* Build the open flags */ o_flags = (H5F_ACC_RDWR & flags) ? O_RDWR : O_RDONLY; @@ -641,54 +578,41 @@ H5FD_stream_open (const char *filename, if (H5F_ACC_EXCL & flags) o_flags |= O_EXCL; if ((O_RDWR & o_flags) && ! (O_CREAT & o_flags)) - { HGOTO_ERROR (H5E_ARGS, H5E_UNSUPPORTED, NULL, "open stream for read/write not supported"); - } #ifdef WIN32 if (WSAStartup (MAKEWORD (2, 0), &wsadata)) - { HGOTO_ERROR (H5E_IO, H5E_CANTINIT, NULL, "Couldn't start Win32 socket layer"); - } #endif fapl = NULL; - if (H5P_DEFAULT != fapl_id) - { + if (H5P_DEFAULT != fapl_id) { if(TRUE!=H5P_isa_class(fapl_id,H5P_FILE_ACCESS) || NULL == (plist = H5I_object(fapl_id))) HGOTO_ERROR(H5E_ARGS, H5E_BADTYPE, NULL, "not a file access property list"); fapl = H5P_get_driver_info (plist); } if (fapl == NULL) - { fapl = &default_fapl; - } /* Create the new file struct */ stream = (H5FD_stream_t *) H5MM_calloc (sizeof (H5FD_stream_t)); if (stream == NULL) - { HGOTO_ERROR(H5E_RESOURCE, H5E_NOSPACE, NULL, "unable to allocate file struct"); - } stream->fapl = *fapl; stream->socket = H5FD_STREAM_INVALID_SOCKET; /* if an external socket is provided with the file access property list we use that, otherwise the filename argument is parsed and a socket is opened internally */ - if (fapl->do_socket_io) - { - if (! H5FD_STREAM_ERROR_CHECK (fapl->socket)) - { + if (fapl->do_socket_io) { + if (! H5FD_STREAM_ERROR_CHECK (fapl->socket)) { stream->internal_socket = FALSE; stream->socket = fapl->socket; } - else - { + else { stream->internal_socket = TRUE; stream->socket = H5FD_stream_open_socket (filename, o_flags, &stream->fapl); - if (stream->socket != H5FD_STREAM_INVALID_SOCKET) - { + if (stream->socket != H5FD_STREAM_INVALID_SOCKET) { /* update the port ID in the file access property so that it can be queried via H5P_get_fapl_stream() later on */ H5P_set_driver (plist, H5FD_STREAM, &stream->fapl); @@ -699,10 +623,8 @@ H5FD_stream_open (const char *filename, } /* read the data from socket into memory */ - if (O_RDONLY == o_flags) - { - if (fapl->do_socket_io) - { + if (O_RDONLY == o_flags) { + if (fapl->do_socket_io) { #ifdef DEBUG fprintf (stderr, "Stream VFD: reading file from socket\n"); #endif @@ -711,27 +633,20 @@ H5FD_stream_open (const char *filename, } /* Now call the user's broadcast routine if given */ - if (fapl->broadcast_fn) - { + if (fapl->broadcast_fn) { if ((fapl->broadcast_fn) (&stream->mem, &stream->eof, fapl->broadcast_arg) < 0) - { HGOTO_ERROR(H5E_IO, H5E_READERROR, NULL, "broadcast error"); - } /* check for filesize of zero bytes */ if (stream->eof == 0) - { HGOTO_ERROR(H5E_IO, H5E_READERROR, NULL, "zero filesize"); - } } /* For files which are read from a socket: the opened socket is not needed anymore */ if (stream->internal_socket && ! H5FD_STREAM_ERROR_CHECK (stream->socket)) - { H5FD_STREAM_CLOSE_SOCKET (stream->socket); - } stream->socket = H5FD_STREAM_INVALID_SOCKET; } @@ -742,13 +657,9 @@ done: if(ret_value==NULL) { if(stream!=NULL) { if (stream->mem) - { - H5MM_xfree (stream->mem); - } + H5MM_xfree (stream->mem); if (stream->internal_socket && ! H5FD_STREAM_ERROR_CHECK (stream->socket)) - { - H5FD_STREAM_CLOSE_SOCKET (stream->socket); - } + H5FD_STREAM_CLOSE_SOCKET (stream->socket); H5MM_xfree(stream); } /* end if */ } @@ -788,17 +699,14 @@ H5FD_stream_flush (H5FD_t *_stream, unsigned UNUSED closing) FUNC_ENTER_NOAPI(H5FD_stream_flush, FAIL); /* Write to backing store */ - if (stream->dirty && ! H5FD_STREAM_ERROR_CHECK (stream->socket)) - { + if (stream->dirty && ! H5FD_STREAM_ERROR_CHECK (stream->socket)) { #ifdef DEBUG fprintf (stderr, "Stream VFD: accepting client connections\n"); #endif fromlen = sizeof (from); while (! H5FD_STREAM_ERROR_CHECK (sock = accept (stream->socket, - &from, &fromlen))) - { - if (H5FD_STREAM_IOCTL_SOCKET (sock, FIONBIO, &on) < 0) - { + &from, &fromlen))) { + if (H5FD_STREAM_IOCTL_SOCKET (sock, FIONBIO, &on) < 0) { H5FD_STREAM_CLOSE_SOCKET (sock); continue; /* continue the loop for other clients to connect */ } @@ -806,15 +714,11 @@ H5FD_stream_flush (H5FD_t *_stream, unsigned UNUSED closing) size = stream->eof; ptr = stream->mem; - while (size) - { + while (size) { bytes_send = send (sock, ptr, size, 0); - if (bytes_send < 0) - { + if (bytes_send < 0) { if (EINTR == errno || EAGAIN == errno || EWOULDBLOCK == errno) - { continue; - } /* continue the outermost loop for other clients to connect */ break; @@ -854,28 +758,24 @@ static herr_t H5FD_stream_close (H5FD_t *_stream) { H5FD_stream_t *stream = (H5FD_stream_t *) _stream; + herr_t ret_value=SUCCEED; /* Return value */ FUNC_ENTER_NOAPI(H5FD_stream_close, FAIL); /* Flush */ if (H5FD_stream_flush (_stream,TRUE) != SUCCEED) - { - HRETURN_ERROR (H5E_FILE, H5E_CANTFLUSH, FAIL, "unable to flush file"); - } + HGOTO_ERROR (H5E_FILE, H5E_CANTFLUSH, FAIL, "unable to flush file"); /* Release resources */ if (! H5FD_STREAM_ERROR_CHECK (stream->socket) && stream->internal_socket) - { H5FD_STREAM_CLOSE_SOCKET (stream->socket); - } if (stream->mem) - { H5MM_xfree (stream->mem); - } HDmemset (stream, 0, sizeof (H5FD_stream_t)); H5MM_xfree (stream); - FUNC_LEAVE (0); +done: + FUNC_LEAVE (ret_value); } @@ -963,17 +863,17 @@ static herr_t H5FD_stream_set_eoa (H5FD_t *_stream, haddr_t addr) { H5FD_stream_t *stream = (H5FD_stream_t *) _stream; + herr_t ret_value=SUCCEED; /* Return value */ FUNC_ENTER_NOAPI(H5FD_stream_set_eoa, FAIL); if (ADDR_OVERFLOW (addr)) - { - HRETURN_ERROR (H5E_ARGS, H5E_OVERFLOW, FAIL, "address overflow"); - } + HGOTO_ERROR (H5E_ARGS, H5E_OVERFLOW, FAIL, "address overflow"); stream->eoa = addr; - FUNC_LEAVE (0); +done: + FUNC_LEAVE (ret_value); } @@ -1036,6 +936,7 @@ H5FD_stream_read (H5FD_t *_stream, { H5FD_stream_t *stream = (H5FD_stream_t *) _stream; size_t nbytes; + herr_t ret_value=SUCCEED; /* Return value */ FUNC_ENTER_NOAPI(H5FD_stream_read, FAIL); @@ -1044,21 +945,14 @@ H5FD_stream_read (H5FD_t *_stream, /* Check for overflow conditions */ if (HADDR_UNDEF == addr) - { - HRETURN_ERROR (H5E_IO, H5E_OVERFLOW, FAIL, "file address overflowed"); - } + HGOTO_ERROR (H5E_IO, H5E_OVERFLOW, FAIL, "file address overflowed"); if (REGION_OVERFLOW (addr, size)) - { - HRETURN_ERROR (H5E_IO, H5E_OVERFLOW, FAIL, "file address overflowed"); - } + HGOTO_ERROR (H5E_IO, H5E_OVERFLOW, FAIL, "file address overflowed"); if (addr + size > stream->eoa) - { - HRETURN_ERROR (H5E_IO, H5E_OVERFLOW, FAIL, "file address overflowed"); - } + HGOTO_ERROR (H5E_IO, H5E_OVERFLOW, FAIL, "file address overflowed"); /* Read the part which is before the EOF marker */ - if (addr < stream->eof) - { + if (addr < stream->eof) { nbytes = MIN (size, stream->eof - addr); HDmemcpy (buf, stream->mem + addr, nbytes); size -= nbytes; @@ -1068,11 +962,10 @@ H5FD_stream_read (H5FD_t *_stream, /* Read zeros for the part which is after the EOF markers */ if (size > 0) - { HDmemset (buf, 0, size); - } - FUNC_LEAVE (SUCCEED); +done: + FUNC_LEAVE (ret_value); } @@ -1102,6 +995,7 @@ H5FD_stream_write (H5FD_t *_stream, const void *buf) { H5FD_stream_t *stream = (H5FD_stream_t *) _stream; + herr_t ret_value=SUCCEED; /* Return value */ FUNC_ENTER_NOAPI(H5FD_stream_write, FAIL); @@ -1110,13 +1004,9 @@ H5FD_stream_write (H5FD_t *_stream, /* Check for overflow conditions */ if (REGION_OVERFLOW (addr, size)) - { - HRETURN_ERROR (H5E_IO, H5E_OVERFLOW, FAIL, "file address overflowed"); - } + HGOTO_ERROR (H5E_IO, H5E_OVERFLOW, FAIL, "file address overflowed"); if (addr + size > stream->eoa) - { - HRETURN_ERROR (H5E_IO, H5E_OVERFLOW, FAIL, "file address overflowed"); - } + HGOTO_ERROR (H5E_IO, H5E_OVERFLOW, FAIL, "file address overflowed"); /* * Allocate more memory if necessary, careful of overflow. Also, if the @@ -1124,30 +1014,19 @@ H5FD_stream_write (H5FD_t *_stream, * careful of non-Posix realloc() that doesn't understand what to do when * the first argument is null. */ - if (addr + size > stream->eof) - { + if (addr + size > stream->eof) { unsigned char *x; haddr_t new_eof = stream->fapl.increment * ((addr+size) / stream->fapl.increment); - if ((addr+size) % stream->fapl.increment) - { new_eof += stream->fapl.increment; - } if (stream->mem == NULL) - { x = H5MM_malloc ((size_t) new_eof); - } else - { x = H5MM_realloc (stream->mem, (size_t) new_eof); - } if (x == NULL) - { - HRETURN_ERROR (H5E_RESOURCE, H5E_NOSPACE, FAIL, - "unable to allocate memory block"); - } + HGOTO_ERROR (H5E_RESOURCE, H5E_NOSPACE, FAIL, "unable to allocate memory block"); stream->mem = x; stream->eof = new_eof; } @@ -1156,7 +1035,8 @@ H5FD_stream_write (H5FD_t *_stream, HDmemcpy (stream->mem + addr, buf, size); stream->dirty = TRUE; - FUNC_LEAVE (SUCCEED); +done: + FUNC_LEAVE (ret_value); } #endif /* H5_HAVE_STREAM */ |